MungingData

tässä blogissa kerrotaan, miten datakehys kirjoitetaan yhteen tiedostoon Sparkilla. Siinä kuvataan myös, miten tietyllä nimellä varustettuun tiedostoon kirjoitetaan tietoja, mikä on yllättävän haastavaa.

yksittäisen tiedoston kirjoittaminen Sparkilla ei ole tyypillistä. Spark on suunniteltu kirjoittamaan useita tiedostoja rinnakkain. Monien tiedostojen kirjoittaminen samaan aikaan on nopeampaa isoille tietokokonaisuuksille.

oletuskäyttäytyminen

luodaan datakehys, käytetään repartition(3) luomaan kolme muistiosiota ja kirjoitetaan sitten tiedosto levylle.

val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")

tässä levyllä syntyvät tiedostot.

Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv

Spark kirjoittaa yhden tiedoston jokaista muistiosiota kohden. repartition(3): n avulla luotiin kolme muistiosiota, joten kirjoitettiin kolme tiedostoa.

yhden tiedoston kirjoittaminen uudelleen

voimme käyttää repartition(1) kirjoittaa yhden tiedoston.

df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")

tässä levylle kirjoitettu tiedosto.

Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv

kirjoitetun tiedoston nimeä ei voi hallita. Voimme hallita hakemiston nimeä, mutta emme itse tiedostoa.

tämä ratkaisu ei ole riittävä, kun haluat kirjoittaa tietoja tiedostoon, jolla on tietty nimi.

yhden tiedoston kirjoittaminen koalitiolla

voidaan käyttää myös coalesce(1) yhden tiedoston kirjoittamiseen.

df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")

tässä se, mikä on lähtenyt.

Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv

coalesce ei anna meidän asettaa tiettyä tiedostonimeä joko (se vain meidän muokata kansion nimi). Meidän täytyy käyttää spark-Dariaa päästäksesi menetelmään, joka tuottaa yhden tiedoston.

kirjoitettaessa tiedostoa, jolla on tietty nimi

voit käyttää spark-dariassa määriteltyä DariaWriters.writeSingleFile funktiota kirjoittaaksesi yksittäisen tiedoston tietyllä tiedostonimellä.

tässä on koodi, joka kirjoittaa datakehyksen sisällön ~/Documents/better/mydata.csv – tiedostoon.

import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")

writeSingleFile menetelmä anna tiedoston nimi murehtimatta monimutkaisia toteutuksen yksityiskohtia.

writeSingleFile is käyttää repartition(1) ja Hadoop-tiedostojärjestelmämenetelmiä konepellin alla. Kaikki Hadoop-tiedostojärjestelmämenetelmät ovat saatavilla missä tahansa Spark runtime-ympäristössä-sinun ei tarvitse liittää erillisiä purkkeja.

yhteensopivuus muiden tiedostojärjestelmien kanssa

on parasta käyttää Hadoop-tiedostojärjestelmämenetelmiä tiedostoja siirrettäessä, uudelleennimettäessä tai poistettaessa, joten koodisi toimii useilla alustoilla. writeSingleFile toimii paikallisessa tiedostojärjestelmässä ja S3: ssa. Voit käyttää tätä lähestymistapaa ajaessasi Sparkia paikallisesti tai Databricks-muistikirjassa.

tähän ongelmaan on muitakin ratkaisuja, jotka eivät ole ristikkäisiä. On olemassa ratkaisuja, jotka toimivat vain Databricksin kannettavissa tietokoneissa tai toimivat vain S3: ssa tai toimivat vain Unixin kaltaisessa käyttöjärjestelmässä.

Hadoop-tiedostojärjestelmämenetelmät ovat kömpelöitä käyttää, mutta paras vaihtoehto aiheuttaa sen, että ne toimivat useilla alustoilla.

writeSingleFile menetelmässä käytetään fs.rename() Hadoop-menetelmää, kuten tässä vastauksessa kuvataan. Tässä on psuedokoodi:

val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)

kopymerge

Hadoop 2: ssa on FileUtil.copyMerge() menetelmä, joka on tyylikäs ratkaisu tähän ongelmaan, mutta tämä menetelmä on vanhentunut ja se poistetaan Hadoop 3: ssa. Tässä viestiketjussa on vastaus, että Hadoop 3: n käyttäjille reimplements copyMerge.

älä missään tapauksessa kirjoita koodia, joka perustuu FileUtil.copyMerge() – menetelmään. Tiedämme, että menetelmä on saavuttamattomissa, kun kipinä päivityksiä Hadoop 3 ja et halua luottaa vanhentunut menetelmä, joka rikkoutuu jossain tuntematon aika tulevaisuudessa.

seuraavat vaiheet

haluat tyypillisesti kirjoittaa useita tiedostoja rinnakkain, mutta harvoissa tapauksissa, kun haluat kirjoittaa yhden tiedoston, spark-daria writeSingleFile – menetelmä auttaa.

yritä parhaasi mukaan paketoida monimutkainen Hadoop-tiedostojärjestelmän logiikka helper-menetelmiin, jotka testataan erillään. Hadoop-tiedostojärjestelmän toimintojen ja Spark-koodin yhdistäminen samassa menetelmässä tekee koodistasi liian monimutkaisen.

You might also like

Vastaa

Sähköpostiosoitettasi ei julkaista.