MungingData

acest blog explică cum să scrieți un cadru de date într-un singur fișier cu Spark. De asemenea, descrie modul de scriere a datelor într-un fișier cu un nume specific, ceea ce este surprinzător de provocator.

scrierea unui singur fișier cu Spark nu este tipică. Spark este conceput pentru a scrie mai multe fișiere în paralel. Scrierea mai multor fișiere în același timp este mai rapidă pentru seturile de date mari.

comportament implicit

să creăm un cadru de date, să folosim repartition(3) pentru a crea trei partiții de memorie, apoi să scriem fișierul pe disc.

val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")

Iată fișierele care sunt generate pe disc.

Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv

Spark scrie un fișier pe partiție de memorie. Am folosit repartition(3) pentru a crea trei partiții de memorie, astfel încât au fost scrise trei fișiere.

scrierea unui fișier cu repartiție

putem folosi repartition(1) scrie un singur fișier.

df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")

Iată fișierul care este scris pe disc.

Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv

nu putem controla numele fișierului scris. Putem controla numele directorului, dar nu fișierul în sine.

această soluție nu este suficientă atunci când doriți să scrieți date într-un fișier cu un anumit nume.

scrierea unui singur fișier cu coalesce

de asemenea, putem folosi coalesce(1) pentru a scrie un singur fișier.

df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")

Iată ce este scos.

Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv

coalesce nu ne permite să setați un anumit nume de fișier, fie (doar să ne personaliza numele folderului). Va trebui să folosim spark-daria pentru a accesa o metodă care va afișa un singur fișier.

scrierea unui fișier cu un anumit nume

puteți utiliza funcția DariaWriters.writeSingleFile definită în spark-daria pentru a scrie un singur fișier cu un anumit nume de fișier.

iată codul care scrie conținutul unui cadru de date în fișierul ~/Documents/better/mydata.csv.

import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")

metoda writeSingleFile vă permite să denumiți fișierul fără să vă faceți griji cu privire la detaliile complicate de implementare.

writeSingleFile este utilizează repartition(1) și Hadoop metode de sistem de fișiere sub capota. Toate metodele sistemului de fișiere Hadoop sunt disponibile în orice mediu de rulare Spark – nu este nevoie să atașați borcane separate.

compatibilitate cu alte sisteme de fișiere

cel mai bine este să utilizați metodele sistemului de fișiere Hadoop atunci când mutați, redenumiți sau ștergeți fișiere, astfel încât codul dvs. să funcționeze pe mai multe platforme. writeSingleFile funcționează pe sistemul de fișiere local și în S3. Puteți utiliza această abordare atunci când rulați Spark local sau într-un notebook Databricks.

există și alte soluții la această problemă care nu sunt cross platform. Există soluții care funcționează numai în notebook-urile Databricks sau funcționează numai în S3 sau funcționează numai pe un sistem de operare asemănător Unix.

metodele sistemului de fișiere Hadoop sunt stângace de lucrat, dar cea mai bună opțiune pentru că funcționează pe mai multe platforme.

metoda writeSingleFile folosește metoda Hadoop fs.rename(), așa cum este descris în acest răspuns. Iată codul psuedocode:

val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)

copyMerge

Hadoop 2 are o metodă FileUtil.copyMerge() care este o soluție elegantă la această problemă, dar această metodă este învechită și va fi eliminată în Hadoop 3. Există un răspuns în acest thread care reimplementează copyMerge pentru utilizatorii Hadoop 3.

în orice caz, nu scrieți cod care se bazează pe metoda FileUtil.copyMerge(). Știm că metoda va fi inaccesibilă atunci când upgrade-urile Spark la Hadoop 3 și nu doriți să vă bazați pe o metodă învechită care se va rupe la un moment necunoscut în viitor.

pașii următori

de obicei veți dori să scrieți Mai multe fișiere în paralel, dar în rarele ocazii când doriți să scrieți un singur fișier, metoda spark-Daria writeSingleFile vă va ajuta.

încercați tot posibilul să înfășurați logica complexă a sistemului de fișiere Hadoop în metode de ajutor care sunt testate separate. Combinarea operațiunilor sistemului de fișiere Hadoop și a Codului Spark în aceeași metodă va face codul dvs. prea complex.

You might also like

Lasă un răspuns

Adresa ta de email nu va fi publicată.