MungingData

In diesem Blog wird erläutert, wie Sie mit Spark einen Datenrahmen in eine einzelne Datei schreiben. Es wird auch beschrieben, wie Daten in eine Datei mit einem bestimmten Namen geschrieben werden, was überraschend schwierig ist.

Das Schreiben einer einzelnen Datei mit Spark ist nicht typisch. Spark wurde entwickelt, um mehrere Dateien parallel zu schreiben. Das gleichzeitige Schreiben vieler Dateien ist bei großen Datensätzen schneller.

Standardverhalten

Erstellen wir einen Datenrahmen, erstellen mit repartition(3) drei Speicherpartitionen und schreiben die Datei dann auf die Festplatte.

val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")

Hier sind die Dateien, die auf der Festplatte generiert werden.

Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv

Spark schreibt eine Datei pro Speicherpartition aus. Wir haben repartition(3) verwendet, um drei Speicherpartitionen zu erstellen, sodass drei Dateien geschrieben wurden.

Eine Datei mit repartition ausschreiben

Wir können repartition(1) eine einzelne Datei ausschreiben.

df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")

Hier ist die Datei, die auf die Festplatte geschrieben wird.

Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv

Wir können den Namen der geschriebenen Datei nicht kontrollieren. Wir können den Namen des Verzeichnisses steuern, aber nicht die Datei selbst.

Diese Lösung reicht nicht aus, wenn Sie Daten in eine Datei mit einem bestimmten Namen schreiben möchten.

Schreiben einer einzelnen Datei mit coalesce

Wir können auch coalesce(1) verwenden, um eine einzelne Datei zu schreiben.

df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")

Hier wird ausgegeben.

Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv

Mit Coalesce können wir auch keinen bestimmten Dateinamen festlegen (wir können nur den Ordnernamen anpassen). Wir müssen spark-daria verwenden, um auf eine Methode zuzugreifen, die eine einzelne Datei ausgibt.

Schreiben einer Datei mit einem bestimmten Namen

Mit der in spark-daria definierten Funktion DariaWriters.writeSingleFile können Sie eine einzelne Datei mit einem bestimmten Dateinamen schreiben.

Hier ist der Code, der den Inhalt eines Datenrahmens in die ~/Documents/better/mydata.csv -Datei schreibt.

import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")

Mit der writeSingleFile -Methode können Sie die Datei benennen, ohne sich um komplizierte Implementierungsdetails kümmern zu müssen.

writeSingleFile verwendet repartition(1) und Hadoop-Dateisystemmethoden unter der Haube. Alle Hadoop-Dateisystemmethoden sind in jeder Spark-Laufzeitumgebung verfügbar – Sie müssen keine separaten JARs anhängen.

Kompatibilität mit anderen Dateisystemen

Verwenden Sie beim Verschieben, Umbenennen oder Löschen von Dateien am besten die Hadoop-Dateisystemmethoden, damit Ihr Code auf mehreren Plattformen funktioniert. writeSingleFile funktioniert auf Ihrem lokalen Dateisystem und in S3. Sie können diesen Ansatz verwenden, wenn Sie Spark lokal oder in einem Databricks-Notebook ausführen.

Es gibt andere Lösungen für dieses Problem, die nicht plattformübergreifend sind. Es gibt Lösungen, die nur in Databricks-Notebooks oder nur in S3 oder nur auf einem Unix-ähnlichen Betriebssystem funktionieren.

Die Hadoop-Dateisystemmethoden sind ungeschickt, aber die beste Option, weil sie auf mehreren Plattformen funktionieren.

Die writeSingleFile -Methode verwendet die fs.rename() Hadoop-Methode, wie in dieser Antwort beschrieben. Hier ist der Psuedocode:

val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)

copyMerge

Hadoop 2 verfügt über eine FileUtil.copyMerge() -Methode, die eine elegante Lösung für dieses Problem darstellt. In diesem Thread gibt es eine Antwort, die copyMerge für Hadoop 3-Benutzer neu implementiert.

Schreiben Sie auf keinen Fall Code, der auf der Methode FileUtil.copyMerge() basiert. Wir wissen, dass auf diese Methode nicht zugegriffen werden kann, wenn Spark auf Hadoop 3 aktualisiert wird, und Sie möchten sich nicht auf eine veraltete Methode verlassen, die zu einem unbekannten Zeitpunkt in der Zukunft unterbrochen wird.

Nächste Schritte

Normalerweise möchten Sie mehrere Dateien parallel schreiben, aber in den seltenen Fällen, in denen Sie eine einzelne Datei schreiben möchten, hilft die spark-daria writeSingleFile -Methode.

Versuchen Sie Ihr Bestes, um die komplexe Hadoop-Dateisystemlogik in Hilfsmethoden zu verpacken, die getrennt getestet werden. Wenn Sie Hadoop-Dateisystemoperationen und Spark-Code in derselben Methode kombinieren, wird Ihr Code zu komplex.

You might also like

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.