ten blog wyjaśnia, jak zapisać ramkę danych do pojedynczego pliku za pomocą Spark. Opisuje również, jak zapisać dane w pliku o określonej nazwie, co jest zaskakująco trudne.
wypisywanie pojedynczego pliku za pomocą Spark nie jest typowe. Spark jest przeznaczony do zapisywania wielu plików równolegle. Zapisywanie wielu plików w tym samym czasie jest szybsze dla dużych zbiorów danych.
zachowanie domyślne
stwórzmy ramkę danych, użyj repartition(3)
, aby utworzyć trzy partycje pamięci, a następnie zapisz plik na dysk.
val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")
oto pliki, które są generowane na dysku.
Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv
Spark zapisuje jeden plik na partycję pamięci. Użyliśmy repartition(3)
do utworzenia trzech partycji pamięci, więc zapisano trzy pliki.
zapisanie jednego pliku z repartition
możemy użyć repartition(1)
zapisanie jednego pliku.
df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")
oto plik zapisany na dysk.
Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv
nie możemy kontrolować nazwy zapisanego pliku. Możemy kontrolować nazwę katalogu, ale nie sam plik.
To rozwiązanie nie jest wystarczające, gdy chcesz zapisać dane do pliku o określonej nazwie.
zapisywanie pojedynczego pliku za pomocą coalesce
możemy również użyć coalesce(1)
, aby zapisać pojedynczy plik.
df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")
Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv
coalesce nie pozwala nam również ustawić określonej nazwy pliku (pozwala tylko dostosować nazwę folderu). Musimy użyć spark-daria, aby uzyskać dostęp do metody, która wyświetli pojedynczy plik.
Zapisywanie pliku o określonej nazwie
możesz użyć funkcji DariaWriters.writeSingleFile
zdefiniowanej w spark-daria, aby zapisać pojedynczy plik o określonej nazwie.
oto kod, który zapisuje zawartość ramki danych do pliku ~/Documents/better/mydata.csv
.
import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")
metoda writeSingleFile
pozwala nazwać plik bez martwienia się o skomplikowane szczegóły implementacji.
writeSingleFile
używa repartition(1)
i metod systemu plików Hadoop pod maską. Wszystkie metody systemu plików Hadoop są dostępne w dowolnym środowisku uruchomieniowym Spark – nie musisz dołączać żadnych oddzielnych plików jar.
kompatybilność z innymi systemami plików
podczas przenoszenia, zmiany nazwy lub usuwania plików najlepiej używać metod systemu plików Hadoop, aby Kod działał na wielu platformach. writeSingleFile
działa na twoim lokalnym systemie plików i w S3. Możesz użyć tego podejścia podczas uruchamiania Spark lokalnie lub w notatniku Databricks.
istnieją inne rozwiązania tego problemu, które nie są cross platform. Istnieją rozwiązania, które działają tylko w notebookach Databricks, lub działają tylko w S3, lub działają tylko na Uniksopodobnym systemie operacyjnym.
metody systemu plików Hadoop są niezgrabne w pracy, ale najlepsza opcja powoduje, że działają na wielu platformach.
metoda writeSingleFile
wykorzystuje metodę fs.rename()
Hadoop, jak opisano w tej odpowiedzi. Oto kod psuedocode:
val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)
copyMerge
Hadoop 2 ma metodę FileUtil.copyMerge()
, która jest eleganckim rozwiązaniem tego problemu, ale ta metoda jest przestarzała i zostanie usunięta w Hadoop 3. W tym wątku jest odpowiedź, że reimplements copyMerge
dla użytkowników Hadoop 3.
w każdym razie nie pisz kodu, który opiera się na metodzie FileUtil.copyMerge()
. Wiemy, że metoda będzie niedostępna, gdy Spark zaktualizuje się do Hadoop 3 i nie chcesz polegać na przestarzałej metodzie, która złamie się w nieznanym czasie w przyszłości.
Następne kroki
zazwyczaj chcesz zapisać wiele plików jednocześnie, ale w rzadkich przypadkach, gdy chcesz zapisać jeden plik, metoda spark-daria writeSingleFile
pomoże.
Postaraj się zawinąć złożoną logikę systemu plików Hadoop w metody pomocnicze, które są testowane oddzielnie. Połączenie operacji na systemie plików Hadoop i kodu Spark w tej samej metodzie sprawi, że Twój kod będzie zbyt złożony.