MungingData

Este blog explica cómo escribir un DataFrame en un solo archivo con Spark. También describe cómo escribir datos en un archivo con un nombre específico, lo cual es sorprendentemente desafiante.

Escribir un solo archivo con Spark no es típico. Spark está diseñado para escribir varios archivos en paralelo. Escribir muchos archivos al mismo tiempo es más rápido para grandes conjuntos de datos.

Comportamiento predeterminado

Vamos a crear un DataFrame, usar repartition(3) para crear tres particiones de memoria y, a continuación, escribir el archivo en el disco.

val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")

Aquí están los archivos que se generan en el disco.

Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv

Spark escribe un archivo por partición de memoria. Usamos repartition(3) para crear tres particiones de memoria, por lo que se escribieron tres archivos.

Escribir un archivo con repartition

podemos usar repartition(1) escribir un solo archivo.

df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")

Aquí está el archivo que se escribe en el disco.

Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv

no podemos controlar el nombre del archivo que está escrito. Podemos controlar el nombre del directorio, pero no el archivo en sí.

Esta solución no es suficiente cuando desea escribir datos en un archivo con un nombre específico.

Escribir un archivo único con unen

también podemos usar coalesce(1) para escribir un solo archivo.

df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")

Esto es lo que se genera.

Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv

coalesce tampoco nos permite establecer un nombre de archivo específico (solo nos permite personalizar el nombre de la carpeta). Necesitaremos usar spark-daria para acceder a un método que genere un solo archivo.

Escribir un archivo con un nombre específico

Puede usar la función DariaWriters.writeSingleFile definida en spark-daria para escribir un solo archivo con un nombre de archivo específico.

Aquí está el código que escribe el contenido de un DataFrame en el archivo ~/Documents/better/mydata.csv.

import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")

El método writeSingleFile le permite nombrar el archivo sin preocuparse por detalles de implementación complicados.

writeSingleFile utiliza repartition(1) y métodos de sistema de archivos Hadoop debajo del capó. Todos los métodos del sistema de archivos Hadoop están disponibles en cualquier entorno de ejecución de Spark, no es necesario adjuntar ningún tarro separado.

Compatibilidad con otros sistemas de archivos

Es mejor usar los métodos del sistema de archivos Hadoop al mover, renombrar o eliminar archivos, para que su código funcione en múltiples plataformas. writeSingleFile funciona en su sistema de archivos local y en S3. Puede utilizar este enfoque al ejecutar Spark localmente o en un cuaderno de Databricks.

Hay otras soluciones para este problema que no son multiplataforma. Hay soluciones que solo funcionan en cuadernos de Databricks, o solo funcionan en S3, o solo funcionan en un sistema operativo tipo Unix.

Los métodos del sistema de archivos de Hadoop son torpes para trabajar, pero la mejor opción es que funcionan en múltiples plataformas.

El método writeSingleFile utiliza el método fs.rename() Hadoop, como se describe en esta respuesta. Aquí está el código psuedocode:

val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)

copyMerge

Hadoop 2 tiene un método FileUtil.copyMerge() que es una solución elegante para este problema, pero este método está obsoleto y se eliminará en Hadoop 3. Hay una respuesta en este hilo que reimplanta copyMerge para usuarios de Hadoop 3.

En cualquier caso, no escriba código que dependa del método FileUtil.copyMerge(). Sabemos que ese método será inaccesible cuando Spark actualice a Hadoop 3 y no querrás depender de un método obsoleto que se rompa en un momento desconocido en el futuro.

Pasos siguientes

Normalmente querrá escribir varios archivos en paralelo, pero en las raras ocasiones en que desee escribir un solo archivo, el método spark-daria writeSingleFile le ayudará.

Haga todo lo posible para envolver la lógica compleja del sistema de archivos de Hadoop en métodos auxiliares que se prueban separados. Combinar las operaciones del sistema de archivos Hadoop y el código de Spark en el mismo método hará que su código sea demasiado complejo.

You might also like

Deja una respuesta

Tu dirección de correo electrónico no será publicada.