MungingData

den här bloggen förklarar hur man skriver ut en DataFrame till en enda fil med Spark. Den beskriver också hur man skriver ut data i en fil med ett specifikt namn, vilket är förvånansvärt utmanande.

att skriva ut en enda fil med Spark är inte typiskt. Spark är utformad för att skriva ut flera filer parallellt. Att skriva ut många filer samtidigt är snabbare för stora datamängder.

standardbeteende

Låt oss skapa en DataFrame, använd repartition(3) för att skapa tre minnespartitioner och skriv sedan ut filen till disken.

val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")

här är filerna som genereras på disken.

Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv

Spark skriver ut en fil per minnespartition. Vi använde repartition(3) för att skapa tre minnespartitioner, så tre filer skrevs.

skriva ut en fil med ompartition

vi kan använda repartition(1) skriva ut en enda fil.

df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")

här är filen som skrivs till disken.

Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv

vi kan inte kontrollera namnet på filen som skrivs. Vi kan styra namnet på katalogen, men inte själva filen.

den här lösningen räcker inte när du vill skriva data till en fil med ett specifikt namn.

skriva ut en enda fil med coalesce

vi kan också använda coalesce(1) för att skriva ut en enda fil.

df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")

här är vad som matas ut.

Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv

coalesce låter oss inte heller ange ett specifikt filnamn (det låter oss bara anpassa mappnamnet). Vi måste använda spark-daria för att komma åt en metod som kommer att mata ut en enda fil.

skriva ut en fil med ett specifikt namn

du kan använda funktionen DariaWriters.writeSingleFile definierad i spark-daria för att skriva ut en enda fil med ett specifikt filnamn.

här är koden som skriver ut innehållet i en DataFrame till filen ~/Documents/better/mydata.csv.

import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")

metoden writeSingleFile låter dig namnge filen utan att oroa dig för komplicerade implementeringsdetaljer.

writeSingleFile is använder repartition(1) och Hadoop-filsystemmetoder under huven. Alla Hadoop-filsystemmetoder är tillgängliga i alla Spark runtime – miljöer-du behöver inte bifoga några separata burkar.

kompatibilitet med andra filsystem

det är bäst att använda Hadoop-filsystemmetoderna när du flyttar, byter namn eller tar bort filer, så din kod fungerar på flera plattformar. writeSingleFile fungerar på ditt lokala filsystem och i S3. Du kan använda detta tillvägagångssätt när du kör Spark lokalt eller i en Databricks-anteckningsbok.

det finns andra lösningar på detta problem som inte är plattformsoberoende. Det finns lösningar som bara fungerar i Databricks-bärbara datorer, eller bara fungerar i S3, eller bara fungerar på ett Unix-liknande operativsystem.

Hadoop-filsystemmetoderna är klumpiga att arbeta med, men det bästa alternativet för att de fungerar på flera plattformar.

writeSingleFile – metoden använder fs.rename() Hadoop-metoden, som beskrivs i detta svar. Här är psuedocode:

val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)

copyMerge

Hadoop 2 har en FileUtil.copyMerge() – metod som är en elegant lösning på detta problem, men den här metoden är föråldrad och kommer att tas bort i Hadoop 3. Det finns ett svar i den här tråden som reimplements copyMerge för Hadoop 3-användare.

skriv i alla fall inte kod som bygger på FileUtil.copyMerge() – metoden. Vi vet att metoden kommer att vara otillgänglig när Spark uppgraderar till Hadoop 3 och du vill inte lita på en föråldrad metod som kommer att bryta vid någon okänd tid i framtiden.

nästa steg

du vill vanligtvis skriva ut flera filer parallellt, men i sällsynta fall när du vill skriva ut en enda fil hjälper spark-daria writeSingleFile – metoden.

gör ditt bästa för att slå in den komplexa Hadoop-filsystemlogiken i hjälpmetoder som testas separerade. Att kombinera Hadoop-filsystemoperationer och Spark-kod i samma metod gör din kod för komplex.

You might also like

Lämna ett svar

Din e-postadress kommer inte publiceras.