MungingData

denne blog forklarer, hvordan man skriver en DataFrame til en enkelt fil med Spark. Det beskriver også, hvordan man skriver data i en fil med et bestemt navn, hvilket er overraskende udfordrende.

at skrive en enkelt fil med Spark er ikke typisk. Spark er designet til at skrive flere filer parallelt. At skrive mange filer på samme tid er hurtigere for store datasæt.

standardadfærd

lad os oprette en DataFrame, bruge repartition(3) til at oprette tre hukommelsespartitioner og derefter skrive filen til disken.

val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")

her er de filer, der genereres på disken.

Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv

Spark skriver en fil pr. Vi brugte repartition(3) til at oprette tre hukommelsespartitioner, så tre filer blev skrevet.

skrivning af en fil med repartition

vi kan bruge repartition(1) skrive en enkelt fil.

df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")

her er den fil, der er skrevet til disk.

Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv

vi kan ikke kontrollere navnet på den fil, der er skrevet. Vi kan kontrollere navnet på mappen, men ikke selve filen.

denne løsning er ikke tilstrækkelig, når du vil skrive data til en fil med et bestemt navn.

skrivning af en enkelt fil med koalesce

vi kan også bruge coalesce(1) til at skrive en enkelt fil.

df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")

her er hvad der udsendes.

Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv

coalesce lader os heller ikke indstille et specifikt filnavn (det lader os kun tilpasse mappenavnet). Vi bliver nødt til at bruge spark-daria for at få adgang til en metode, der udsender en enkelt fil.

skrivning af en fil med et specifikt navn

du kan bruge funktionen DariaWriters.writeSingleFile defineret i spark-daria til at skrive en enkelt fil med et specifikt filnavn.

her er koden, der skriver indholdet af en DataFrame til ~/Documents/better/mydata.csv filen.

import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")

metoden writeSingleFile lader dig navngive filen uden at bekymre dig om komplicerede implementeringsdetaljer.

writeSingleFile is bruger repartition(1) og Hadoop filsystem metoder under hætten. Alle Hadoop-filsystemmetoderne er tilgængelige i ethvert Spark runtime – miljø-du behøver ikke at vedhæfte separate krukker.

kompatibilitet med andre filsystemer

det er bedst at bruge Hadoop-filsystemmetoderne, når du flytter, omdøber eller sletter filer, så din kode fungerer på flere platforme. writeSingleFile virker på dit lokale filsystem og i S3. Du kan bruge denne fremgangsmåde, når du kører Spark lokalt eller i en Databricks notebook.

der er andre løsninger på dette problem, der ikke er cross platform. Der er løsninger, der kun fungerer i Databricks notebooks, eller kun fungerer i S3, eller kun fungerer på et unikt operativsystem.

Hadoop-filsystemmetoderne er klodsede at arbejde med, men den bedste mulighed, fordi de arbejder på flere platforme.

writeSingleFile – metoden bruger fs.rename() Hadoop-metoden som beskrevet i dette svar. Her er psuedokoden:

val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)

Copymerge

Hadoop 2 har en FileUtil.copyMerge() metode, der er en elegant løsning på dette problem, men denne metode er forældet og vil blive fjernet i Hadoop 3. Der er et svar i denne tråd, der genimplementerer copyMerge for Hadoop 3 brugere.

skriv under alle omstændigheder ikke kode, der er afhængig af FileUtil.copyMerge() – metoden. Vi ved, at metoden vil være utilgængelig, når Spark opgraderinger til Hadoop 3 og du ikke ønsker at stole på en forældet metode, der vil bryde på et ukendt tidspunkt i fremtiden.

næste trin

du vil typisk skrive flere filer parallelt, men i de sjældne tilfælde, når du vil skrive en enkelt fil, vil spark-daria writeSingleFile – metoden hjælpe.

Prøv dit bedste for at pakke den komplekse Hadoop-filsystemlogik i hjælpemetoder, der testes adskilt. Ved at kombinere Hadoop filsystem operationer og gnist kode i samme metode vil gøre din kode for kompleks.

You might also like

Skriv et svar

Din e-mailadresse vil ikke blive publiceret.