MungingData

deze blog legt uit hoe je een DataFrame naar een enkel bestand met Spark kunt schrijven. Het beschrijft ook hoe je gegevens kunt schrijven in een bestand met een specifieke naam, wat verrassend uitdagend is.

het schrijven van een enkel bestand met Spark is niet typisch. Spark is ontworpen om meerdere bestanden parallel te schrijven. Veel bestanden tegelijkertijd schrijven is sneller voor grote datasets.

Default behaviour

laten we een DataFrame maken, gebruik repartition(3) om drie geheugenpartities aan te maken, en schrijf het bestand vervolgens uit naar de schijf.

val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")

hier zijn de bestanden die op de schijf worden gegenereerd.

Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv

Spark schrijft één bestand per geheugenpartitie uit. We gebruikten repartition(3) om drie geheugenpartities aan te maken, dus werden er drie bestanden geschreven.

een bestand uitschrijven met herpartitie

we kunnen repartition(1) een enkel bestand uitschrijven.

df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")

hier is het bestand dat naar schijf geschreven is.

Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv

We kunnen de naam van het geschreven bestand niet controleren. We kunnen de naam van de directory controleren, maar niet het bestand zelf.

deze oplossing is niet voldoende als u gegevens wilt schrijven naar een bestand met een specifieke naam.

schrijven van een enkel bestand met coalesce

we kunnen ook coalesce(1) gebruiken om een enkel bestand uit te schrijven.

df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")

Dit is wat wordt uitgevoerd.

Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv

coalesce laat ons ook geen specifieke bestandsnaam instellen (Het laat ons alleen de mapnaam aanpassen). We moeten spark-daria gebruiken om toegang te krijgen tot een methode die een enkel bestand oplevert.

het schrijven van een bestand met een specifieke naam

u kunt de DariaWriters.writeSingleFile functie gedefinieerd in spark-daria gebruiken om een enkel bestand met een specifieke bestandsnaam uit te schrijven.

hier is de code die de inhoud van een DataFrame naar het ~/Documents/better/mydata.csv bestand schrijft.

import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")

met de writeSingleFile methode kunt u het bestand een naam geven zonder u zorgen te maken over ingewikkelde implementatiedetails.

writeSingleFile is gebruikt repartition(1) en Hadoop bestandssysteem methoden onder de hood. Alle methoden van het Hadoop-bestandssysteem zijn beschikbaar in elke Spark Runtime-Omgeving – u hoeft geen afzonderlijke potten aan te sluiten.

compatibiliteit met andere bestandssystemen

het is het beste om de methoden van het Hadoop-bestandssysteem te gebruiken bij het verplaatsen, hernoemen of verwijderen van bestanden, zodat uw code op meerdere platforms werkt. writeSingleFile werkt op uw lokale bestandssysteem en in S3. U kunt deze aanpak gebruiken bij het uitvoeren van Spark lokaal of in een Databricks notebook.

er zijn andere oplossingen voor dit probleem die niet platformoverschrijdend zijn. Er zijn oplossingen die alleen werken in Databricks-notebooks, of alleen werken in S3, of alleen werken op een Unix-achtig besturingssysteem.

de Hadoop bestandssysteem methoden zijn onhandig om mee te werken, maar de beste optie omdat ze werken op meerdere platforms.

de writeSingleFile methode gebruikt de fs.rename() Hadoop methode, zoals beschreven in dit antwoord. Hier is de psuedocode:

val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)

copyMerge

Hadoop 2 heeft een FileUtil.copyMerge() methode die een elegante oplossing is voor dit probleem, maar deze methode is verouderd en zal worden verwijderd in Hadoop 3. Er is een antwoord in deze thread dat copyMerge herimplementeert voor Hadoop 3 gebruikers.

schrijf in ieder geval geen code die gebaseerd is op de FileUtil.copyMerge() methode. We weten dat de methode ontoegankelijk zal zijn wanneer Spark upgrades naar Hadoop 3 en je niet wilt vertrouwen op een verouderde methode die zal breken op een onbekend moment in de toekomst.

volgende stappen

gewoonlijk wilt u meerdere bestanden parallel schrijven, maar in de zeldzame gevallen waarin u een enkel bestand wilt schrijven, zal de spark-daria writeSingleFile methode helpen.

probeer de complexe logica van het Hadoop-bestandssysteem om te wikkelen in helper-methoden die gescheiden worden getest. Het combineren van Hadoop bestandssysteem operaties en Spark code in dezelfde methode zal uw code te complex maken.

You might also like

Geef een antwoord

Het e-mailadres wordt niet gepubliceerd.