Questo blog spiega come scrivere un DataFrame in un singolo file con Spark. Descrive anche come scrivere i dati in un file con un nome specifico, che è sorprendentemente impegnativo.
Scrivere un singolo file con Spark non è tipico. Spark è progettato per scrivere più file in parallelo. Scrivere molti file allo stesso tempo è più veloce per i grandi set di dati.
Comportamento predefinito
Creiamo un DataFrame, usiamo repartition(3)
per creare tre partizioni di memoria e quindi scriviamo il file su disco.
val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")
Ecco i file generati sul disco.
Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv
Spark scrive un file per partizione di memoria. Abbiamo usato repartition(3)
per creare tre partizioni di memoria, quindi sono stati scritti tre file.
Scrivere un file con ripartizione
Possiamo usare repartition(1)
scrivere un singolo file.
df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")
Ecco il file scritto su disco.
Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv
Non possiamo controllare il nome del file che è stato scritto. Possiamo controllare il nome della directory, ma non il file stesso.
Questa soluzione non è sufficiente quando si desidera scrivere dati in un file con un nome specifico.
Scrivere un singolo file con coalesce
Possiamo anche usare coalesce(1)
per scrivere un singolo file.
df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")
Ecco cosa viene emesso.
Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv
coalesce non ci consente di impostare un nome file specifico (ci consente solo di personalizzare il nome della cartella). Dovremo usare spark-daria per accedere a un metodo che produrrà un singolo file.
Scrivere un file con un nome specifico
È possibile utilizzare la funzione DariaWriters.writeSingleFile
definita in spark-daria per scrivere un singolo file con un nome specifico.
Ecco il codice che scrive il contenuto di un DataFrame nel file ~/Documents/better/mydata.csv
.
import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")
Il metodo writeSingleFile
consente di assegnare un nome al file senza preoccuparsi di complicati dettagli di implementazione.
writeSingleFile
is usa repartition(1)
e i metodi del filesystem Hadoop sotto il cofano. Tutti i metodi del filesystem Hadoop sono disponibili in qualsiasi ambiente di runtime Spark-non è necessario collegare JAR separati.
Compatibilità con altri filesystem
È preferibile utilizzare i metodi del filesystem Hadoop durante lo spostamento, la ridenominazione o l’eliminazione di file, in modo che il codice funzioni su più piattaforme. writeSingleFile
funziona sul tuo filesystem locale e in S3. È possibile utilizzare questo approccio quando si esegue Spark localmente o in un notebook Databricks.
Esistono altre soluzioni a questo problema che non sono multipiattaforma. Esistono soluzioni che funzionano solo nei notebook Databricks o funzionano solo in S3 o funzionano solo su un sistema operativo simile a Unix.
I metodi del filesystem Hadoop sono goffi da usare, ma l’opzione migliore è che funzionano su più piattaforme.
Il metodo writeSingleFile
utilizza il metodo Hadoop fs.rename()
, come descritto in questa risposta. Ecco il psuedocode:
val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)
copyMerge
Hadoop 2 ha un metodo FileUtil.copyMerge()
che è una soluzione elegante a questo problema, ma questo metodo è deprecato e verrà rimosso in Hadoop 3. C’è una risposta in questo thread che reimplementa copyMerge
per gli utenti Hadoop 3.
In ogni caso, non scrivere codice che si basa sul metodo FileUtil.copyMerge()
. Sappiamo che il metodo sarà inaccessibile quando Spark si aggiorna a Hadoop 3 e non si vuole fare affidamento su un metodo deprecato che si interromperà in un momento sconosciuto in futuro.
Passaggi successivi
In genere si desidera scrivere più file in parallelo, ma nelle rare occasioni in cui si desidera scrivere un singolo file, il metodo spark-daria writeSingleFile
aiuterà.
Fai del tuo meglio per avvolgere la complessa logica del filesystem Hadoop in metodi di supporto testati separati. La combinazione delle operazioni del filesystem Hadoop e del codice Spark nello stesso metodo renderà il tuo codice troppo complesso.