acest blog explică cum să scrieți un cadru de date într-un singur fișier cu Spark. De asemenea, descrie modul de scriere a datelor într-un fișier cu un nume specific, ceea ce este surprinzător de provocator.
scrierea unui singur fișier cu Spark nu este tipică. Spark este conceput pentru a scrie mai multe fișiere în paralel. Scrierea mai multor fișiere în același timp este mai rapidă pentru seturile de date mari.
comportament implicit
să creăm un cadru de date, să folosim repartition(3)
pentru a crea trei partiții de memorie, apoi să scriem fișierul pe disc.
val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")
Iată fișierele care sunt generate pe disc.
Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv
Spark scrie un fișier pe partiție de memorie. Am folosit repartition(3)
pentru a crea trei partiții de memorie, astfel încât au fost scrise trei fișiere.
scrierea unui fișier cu repartiție
putem folosi repartition(1)
scrie un singur fișier.
df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")
Iată fișierul care este scris pe disc.
Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv
nu putem controla numele fișierului scris. Putem controla numele directorului, dar nu fișierul în sine.
această soluție nu este suficientă atunci când doriți să scrieți date într-un fișier cu un anumit nume.
scrierea unui singur fișier cu coalesce
de asemenea, putem folosi coalesce(1)
pentru a scrie un singur fișier.
df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")
Iată ce este scos.
Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv
coalesce nu ne permite să setați un anumit nume de fișier, fie (doar să ne personaliza numele folderului). Va trebui să folosim spark-daria pentru a accesa o metodă care va afișa un singur fișier.
scrierea unui fișier cu un anumit nume
puteți utiliza funcția DariaWriters.writeSingleFile
definită în spark-daria pentru a scrie un singur fișier cu un anumit nume de fișier.
iată codul care scrie conținutul unui cadru de date în fișierul ~/Documents/better/mydata.csv
.
import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")
metoda writeSingleFile
vă permite să denumiți fișierul fără să vă faceți griji cu privire la detaliile complicate de implementare.
writeSingleFile
este utilizează repartition(1)
și Hadoop metode de sistem de fișiere sub capota. Toate metodele sistemului de fișiere Hadoop sunt disponibile în orice mediu de rulare Spark – nu este nevoie să atașați borcane separate.
compatibilitate cu alte sisteme de fișiere
cel mai bine este să utilizați metodele sistemului de fișiere Hadoop atunci când mutați, redenumiți sau ștergeți fișiere, astfel încât codul dvs. să funcționeze pe mai multe platforme. writeSingleFile
funcționează pe sistemul de fișiere local și în S3. Puteți utiliza această abordare atunci când rulați Spark local sau într-un notebook Databricks.
există și alte soluții la această problemă care nu sunt cross platform. Există soluții care funcționează numai în notebook-urile Databricks sau funcționează numai în S3 sau funcționează numai pe un sistem de operare asemănător Unix.
metodele sistemului de fișiere Hadoop sunt stângace de lucrat, dar cea mai bună opțiune pentru că funcționează pe mai multe platforme.
metoda writeSingleFile
folosește metoda Hadoop fs.rename()
, așa cum este descris în acest răspuns. Iată codul psuedocode:
val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)
copyMerge
Hadoop 2 are o metodă FileUtil.copyMerge()
care este o soluție elegantă la această problemă, dar această metodă este învechită și va fi eliminată în Hadoop 3. Există un răspuns în acest thread care reimplementează copyMerge
pentru utilizatorii Hadoop 3.
în orice caz, nu scrieți cod care se bazează pe metoda FileUtil.copyMerge()
. Știm că metoda va fi inaccesibilă atunci când upgrade-urile Spark la Hadoop 3 și nu doriți să vă bazați pe o metodă învechită care se va rupe la un moment necunoscut în viitor.
pașii următori
de obicei veți dori să scrieți Mai multe fișiere în paralel, dar în rarele ocazii când doriți să scrieți un singur fișier, metoda spark-Daria writeSingleFile
vă va ajuta.
încercați tot posibilul să înfășurați logica complexă a sistemului de fișiere Hadoop în metode de ajutor care sunt testate separate. Combinarea operațiunilor sistemului de fișiere Hadoop și a Codului Spark în aceeași metodă va face codul dvs. prea complex.