MungingData

Tento blog vysvětluje, jak napsat Datovém do jediného souboru s Jiskrou. Popisuje také, jak zapisovat data do souboru s konkrétním názvem, což je překvapivě náročné.

vypsat jeden soubor se Spark není typické. Spark je navržen tak, aby zapisoval více souborů paralelně. Zapisování mnoha souborů současně je rychlejší pro velké datové sady.

Výchozí chování

Pojďme vytvořit Datovém repartition(3) vytvořit tři paměťové oddíly, a pak se zapsat soubor na disk.

val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")

zde jsou soubory, které jsou generovány na disku.

Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv

Spark vypíše jeden soubor na paměťový oddíl. Použili jsme repartition(3) k vytvoření tří paměťových oddílů, takže byly zapsány tři soubory.

vypsání jednoho souboru s rozdělením

můžeme použít repartition(1) vypsat jeden soubor.

df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")

zde je soubor, který je zapsán na disk.

Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv

nemůžeme ovládat název souboru, který je napsán. Můžeme ovládat název adresáře, ale ne samotný soubor.

toto řešení nestačí, pokud chcete zapisovat data do souboru s konkrétním názvem.

vypsání jednoho souboru s coalesce

můžeme také použít coalesce(1) napsat jeden soubor.

df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")

zde je výstup.

Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv

coalesce nám nedovolí nastavit konkrétní název souboru (přizpůsobme si pouze název složky). Budeme muset použít spark-daria pro přístup k metodě, která bude výstup jednoho souboru.

Psaní se souboru s určitým názvem

můžete použít DariaWriters.writeSingleFile funkce definované v spark-daria napsat jeden soubor s určitým názvem.

zde je kód, který zapisuje obsah datového rámce do souboru ~/Documents/better/mydata.csv.

import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")

metoda writeSingleFile vám umožní pojmenovat soubor bez obav o složité detaily implementace.

writeSingleFile používá repartition(1) a Hadoop metody souborového systému pod kapotou. Všechny metody souborového systému Hadoop jsou k dispozici v jakémkoli prostředí Spark runtime – nemusíte připojovat žádné samostatné nádoby.

Kompatibilita s jinými filesystémy

To je nejlepší použít souborový systém Hadoop metody při přesouvání, přejmenování či mazání souborů, takže váš kód bude fungovat na více platformách. writeSingleFile pracuje na místním souborovém systému a v S3. Tento přístup můžete použít při lokálním spuštění Spark nebo v notebooku Databricks.

existují i jiná řešení tohoto problému, která nejsou multiplatformní. Existují řešení, která pracují pouze v noteboocích Databricks nebo pracují pouze v S3 nebo pracují pouze na operačním systému Unix.

metody souborového systému Hadoop jsou nemotorné, ale nejlepší volbou je, že pracují na více platformách.

metoda writeSingleFile používá metodu fs.rename() Hadoop, jak je popsáno v této odpovědi. Zde je psuedocode:

val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)

copyMerge

Hadoop 2 má metodu FileUtil.copyMerge(), která je elegantním řešením tohoto problému, ale tato metoda je zastaralá a bude odstraněna v Hadoop 3. V tomto vlákně je odpověď, která reimplementuje copyMerge pro uživatele Hadoop 3.

v žádném případě nepište kód, který se spoléhá na metodu FileUtil.copyMerge(). Víme, že tato metoda bude nepřístupná, když Spark upgraduje na Hadoop 3 a nechcete se spoléhat na zastaralou metodu, která se v budoucnu zlomí v nějaké neznámé době.

Další kroky

obvykle Budete chtít vypsat více souborů paralelně, ale ve vzácných případech, když chcete napsat jeden soubor jiskra-daria writeSingleFile metoda pomůže.

snažte se co nejlépe zabalit komplexní logiku souborového systému Hadoop do pomocných metod, které jsou testovány odděleně. Kombinace operací souborového systému Hadoop a kódu Spark ve stejné metodě způsobí, že váš kód bude příliš složitý.

You might also like

Napsat komentář

Vaše e-mailová adresa nebude zveřejněna.