tässä blogissa kerrotaan, miten datakehys kirjoitetaan yhteen tiedostoon Sparkilla. Siinä kuvataan myös, miten tietyllä nimellä varustettuun tiedostoon kirjoitetaan tietoja, mikä on yllättävän haastavaa.
yksittäisen tiedoston kirjoittaminen Sparkilla ei ole tyypillistä. Spark on suunniteltu kirjoittamaan useita tiedostoja rinnakkain. Monien tiedostojen kirjoittaminen samaan aikaan on nopeampaa isoille tietokokonaisuuksille.
oletuskäyttäytyminen
luodaan datakehys, käytetään repartition(3)
luomaan kolme muistiosiota ja kirjoitetaan sitten tiedosto levylle.
val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")
tässä levyllä syntyvät tiedostot.
Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv
Spark kirjoittaa yhden tiedoston jokaista muistiosiota kohden. repartition(3)
: n avulla luotiin kolme muistiosiota, joten kirjoitettiin kolme tiedostoa.
yhden tiedoston kirjoittaminen uudelleen
voimme käyttää repartition(1)
kirjoittaa yhden tiedoston.
df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")
tässä levylle kirjoitettu tiedosto.
Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv
kirjoitetun tiedoston nimeä ei voi hallita. Voimme hallita hakemiston nimeä, mutta emme itse tiedostoa.
tämä ratkaisu ei ole riittävä, kun haluat kirjoittaa tietoja tiedostoon, jolla on tietty nimi.
yhden tiedoston kirjoittaminen koalitiolla
voidaan käyttää myös coalesce(1)
yhden tiedoston kirjoittamiseen.
df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")
tässä se, mikä on lähtenyt.
Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv
coalesce ei anna meidän asettaa tiettyä tiedostonimeä joko (se vain meidän muokata kansion nimi). Meidän täytyy käyttää spark-Dariaa päästäksesi menetelmään, joka tuottaa yhden tiedoston.
kirjoitettaessa tiedostoa, jolla on tietty nimi
voit käyttää spark-dariassa määriteltyä DariaWriters.writeSingleFile
funktiota kirjoittaaksesi yksittäisen tiedoston tietyllä tiedostonimellä.
tässä on koodi, joka kirjoittaa datakehyksen sisällön ~/Documents/better/mydata.csv
– tiedostoon.
import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")
writeSingleFile
menetelmä anna tiedoston nimi murehtimatta monimutkaisia toteutuksen yksityiskohtia.
writeSingleFile
is käyttää repartition(1)
ja Hadoop-tiedostojärjestelmämenetelmiä konepellin alla. Kaikki Hadoop-tiedostojärjestelmämenetelmät ovat saatavilla missä tahansa Spark runtime-ympäristössä-sinun ei tarvitse liittää erillisiä purkkeja.
yhteensopivuus muiden tiedostojärjestelmien kanssa
on parasta käyttää Hadoop-tiedostojärjestelmämenetelmiä tiedostoja siirrettäessä, uudelleennimettäessä tai poistettaessa, joten koodisi toimii useilla alustoilla. writeSingleFile
toimii paikallisessa tiedostojärjestelmässä ja S3: ssa. Voit käyttää tätä lähestymistapaa ajaessasi Sparkia paikallisesti tai Databricks-muistikirjassa.
tähän ongelmaan on muitakin ratkaisuja, jotka eivät ole ristikkäisiä. On olemassa ratkaisuja, jotka toimivat vain Databricksin kannettavissa tietokoneissa tai toimivat vain S3: ssa tai toimivat vain Unixin kaltaisessa käyttöjärjestelmässä.
Hadoop-tiedostojärjestelmämenetelmät ovat kömpelöitä käyttää, mutta paras vaihtoehto aiheuttaa sen, että ne toimivat useilla alustoilla.
writeSingleFile
menetelmässä käytetään fs.rename()
Hadoop-menetelmää, kuten tässä vastauksessa kuvataan. Tässä on psuedokoodi:
val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)
kopymerge
Hadoop 2: ssa on FileUtil.copyMerge()
menetelmä, joka on tyylikäs ratkaisu tähän ongelmaan, mutta tämä menetelmä on vanhentunut ja se poistetaan Hadoop 3: ssa. Tässä viestiketjussa on vastaus, että Hadoop 3: n käyttäjille reimplements copyMerge
.
älä missään tapauksessa kirjoita koodia, joka perustuu FileUtil.copyMerge()
– menetelmään. Tiedämme, että menetelmä on saavuttamattomissa, kun kipinä päivityksiä Hadoop 3 ja et halua luottaa vanhentunut menetelmä, joka rikkoutuu jossain tuntematon aika tulevaisuudessa.
seuraavat vaiheet
haluat tyypillisesti kirjoittaa useita tiedostoja rinnakkain, mutta harvoissa tapauksissa, kun haluat kirjoittaa yhden tiedoston, spark-daria writeSingleFile
– menetelmä auttaa.
yritä parhaasi mukaan paketoida monimutkainen Hadoop-tiedostojärjestelmän logiikka helper-menetelmiin, jotka testataan erillään. Hadoop-tiedostojärjestelmän toimintojen ja Spark-koodin yhdistäminen samassa menetelmässä tekee koodistasi liian monimutkaisen.