MungingData

ten blog wyjaśnia, jak zapisać ramkę danych do pojedynczego pliku za pomocą Spark. Opisuje również, jak zapisać dane w pliku o określonej nazwie, co jest zaskakująco trudne.

wypisywanie pojedynczego pliku za pomocą Spark nie jest typowe. Spark jest przeznaczony do zapisywania wielu plików równolegle. Zapisywanie wielu plików w tym samym czasie jest szybsze dla dużych zbiorów danych.

zachowanie domyślne

stwórzmy ramkę danych, użyj repartition(3), aby utworzyć trzy partycje pamięci, a następnie zapisz plik na dysk.

val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")

oto pliki, które są generowane na dysku.

Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv

Spark zapisuje jeden plik na partycję pamięci. Użyliśmy repartition(3) do utworzenia trzech partycji pamięci, więc zapisano trzy pliki.

zapisanie jednego pliku z repartition

możemy użyć repartition(1) zapisanie jednego pliku.

df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")

oto plik zapisany na dysk.

Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv

nie możemy kontrolować nazwy zapisanego pliku. Możemy kontrolować nazwę katalogu, ale nie sam plik.

To rozwiązanie nie jest wystarczające, gdy chcesz zapisać dane do pliku o określonej nazwie.

zapisywanie pojedynczego pliku za pomocą coalesce

możemy również użyć coalesce(1), aby zapisać pojedynczy plik.

df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")

Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv

coalesce nie pozwala nam również ustawić określonej nazwy pliku (pozwala tylko dostosować nazwę folderu). Musimy użyć spark-daria, aby uzyskać dostęp do metody, która wyświetli pojedynczy plik.

Zapisywanie pliku o określonej nazwie

możesz użyć funkcji DariaWriters.writeSingleFile zdefiniowanej w spark-daria, aby zapisać pojedynczy plik o określonej nazwie.

oto kod, który zapisuje zawartość ramki danych do pliku ~/Documents/better/mydata.csv.

import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")

metoda writeSingleFile pozwala nazwać plik bez martwienia się o skomplikowane szczegóły implementacji.

writeSingleFile używa repartition(1) i metod systemu plików Hadoop pod maską. Wszystkie metody systemu plików Hadoop są dostępne w dowolnym środowisku uruchomieniowym Spark – nie musisz dołączać żadnych oddzielnych plików jar.

kompatybilność z innymi systemami plików

podczas przenoszenia, zmiany nazwy lub usuwania plików najlepiej używać metod systemu plików Hadoop, aby Kod działał na wielu platformach. writeSingleFile działa na twoim lokalnym systemie plików i w S3. Możesz użyć tego podejścia podczas uruchamiania Spark lokalnie lub w notatniku Databricks.

istnieją inne rozwiązania tego problemu, które nie są cross platform. Istnieją rozwiązania, które działają tylko w notebookach Databricks, lub działają tylko w S3, lub działają tylko na Uniksopodobnym systemie operacyjnym.

metody systemu plików Hadoop są niezgrabne w pracy, ale najlepsza opcja powoduje, że działają na wielu platformach.

metoda writeSingleFile wykorzystuje metodę fs.rename() Hadoop, jak opisano w tej odpowiedzi. Oto kod psuedocode:

val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)

copyMerge

Hadoop 2 ma metodę FileUtil.copyMerge(), która jest eleganckim rozwiązaniem tego problemu, ale ta metoda jest przestarzała i zostanie usunięta w Hadoop 3. W tym wątku jest odpowiedź, że reimplements copyMerge dla użytkowników Hadoop 3.

w każdym razie nie pisz kodu, który opiera się na metodzie FileUtil.copyMerge(). Wiemy, że metoda będzie niedostępna, gdy Spark zaktualizuje się do Hadoop 3 i nie chcesz polegać na przestarzałej metodzie, która złamie się w nieznanym czasie w przyszłości.

Następne kroki

zazwyczaj chcesz zapisać wiele plików jednocześnie, ale w rzadkich przypadkach, gdy chcesz zapisać jeden plik, metoda spark-daria writeSingleFile pomoże.

Postaraj się zawinąć złożoną logikę systemu plików Hadoop w metody pomocnicze, które są testowane oddzielnie. Połączenie operacji na systemie plików Hadoop i kodu Spark w tej samej metodzie sprawi, że Twój kod będzie zbyt złożony.

You might also like

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany.