MungingData

Ce blog explique comment écrire une trame de données dans un seul fichier avec Spark. Il décrit également comment écrire des données dans un fichier avec un nom spécifique, ce qui est étonnamment difficile.

Écrire un seul fichier avec Spark n’est pas typique. Spark est conçu pour écrire plusieurs fichiers en parallèle. L’écriture simultanée de plusieurs fichiers est plus rapide pour les grands ensembles de données.

Comportement par défaut

Créons une trame de données, utilisons repartition(3) pour créer trois partitions de mémoire, puis écrivons le fichier sur le disque.

val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")

Voici les fichiers générés sur le disque.

Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv

Spark écrit un fichier par partition mémoire. Nous avons utilisé repartition(3) pour créer trois partitions de mémoire, donc trois fichiers ont été écrits.

Écriture d’un fichier avec repartition

Nous pouvons utiliser repartition(1) écrire un seul fichier.

df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")

Voici le fichier qui est écrit sur le disque.

Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv

Nous ne pouvons pas contrôler le nom du fichier écrit. Nous pouvons contrôler le nom du répertoire, mais pas le fichier lui-même.

Cette solution n’est pas suffisante lorsque vous souhaitez écrire des données dans un fichier avec un nom spécifique.

Écriture d’un seul fichier avec coalesce

Nous pouvons également utiliser coalesce(1) pour écrire un seul fichier.

df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")

Voici ce qui est produit.

Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv

coalesce ne nous permet pas non plus de définir un nom de fichier spécifique (il nous permet seulement de personnaliser le nom du dossier). Nous devrons utiliser spark-daria pour accéder à une méthode qui produira un seul fichier.

Écriture d’un fichier avec un nom spécifique

Vous pouvez utiliser la fonction DariaWriters.writeSingleFile définie dans spark-daria pour écrire un seul fichier avec un nom de fichier spécifique.

Voici le code qui écrit le contenu d’une trame de données dans le fichier ~/Documents/better/mydata.csv.

import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")

La méthode writeSingleFile vous permet de nommer le fichier sans vous soucier des détails d’implémentation compliqués.

writeSingleFile utilise repartition(1) et les méthodes du système de fichiers Hadoop sous le capot. Toutes les méthodes du système de fichiers Hadoop sont disponibles dans n’importe quel environnement d’exécution Spark – vous n’avez pas besoin d’attacher de fichiers JAR séparés.

Compatibilité avec d’autres systèmes de fichiers

Il est préférable d’utiliser les méthodes du système de fichiers Hadoop lors du déplacement, du renommage ou de la suppression de fichiers, afin que votre code fonctionne sur plusieurs plates-formes. writeSingleFile fonctionne sur votre système de fichiers local et dans S3. Vous pouvez utiliser cette approche lors de l’exécution de Spark localement ou dans un bloc-notes Databricks.

Il existe d’autres solutions à ce problème qui ne sont pas multiplateformes. Il existe des solutions qui ne fonctionnent que dans les blocs-notes Databricks, ou ne fonctionnent que dans S3, ou ne fonctionnent que sur un système d’exploitation de type Unix.

Les méthodes du système de fichiers Hadoop sont maladroites à utiliser, mais la meilleure option est qu’elles fonctionnent sur plusieurs plates-formes.

La méthode writeSingleFile utilise la méthode Hadoop fs.rename(), comme décrit dans cette réponse. Voici le code d’alimentation:

val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)

copyMerge

Hadoop 2 a une méthode FileUtil.copyMerge() qui est une solution élégante à ce problème, mais cette méthode est obsolète et sera supprimée dans Hadoop 3. Il y a une réponse dans ce thread qui réimplante copyMerge pour les utilisateurs de Hadoop 3.

Dans tous les cas, n’écrivez pas de code qui repose sur la méthode FileUtil.copyMerge(). Nous savons que cette méthode sera inaccessible lorsque Spark mettra à niveau Hadoop 3 et que vous ne voulez pas compter sur une méthode obsolète qui se cassera à un moment inconnu dans le futur.

Étapes suivantes

Vous voudrez généralement écrire plusieurs fichiers en parallèle, mais dans les rares occasions où vous souhaitez écrire un seul fichier, la méthode spark-daria writeSingleFile vous aidera.

Faites de votre mieux pour envelopper la logique complexe du système de fichiers Hadoop dans des méthodes auxiliaires qui sont testées séparées. La combinaison des opérations du système de fichiers Hadoop et du code Spark dans la même méthode rendra votre code trop complexe.

You might also like

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.