Ce blog explique comment écrire une trame de données dans un seul fichier avec Spark. Il décrit également comment écrire des données dans un fichier avec un nom spécifique, ce qui est étonnamment difficile.
Écrire un seul fichier avec Spark n’est pas typique. Spark est conçu pour écrire plusieurs fichiers en parallèle. L’écriture simultanée de plusieurs fichiers est plus rapide pour les grands ensembles de données.
Comportement par défaut
Créons une trame de données, utilisons repartition(3)
pour créer trois partitions de mémoire, puis écrivons le fichier sur le disque.
val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")
Voici les fichiers générés sur le disque.
Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv
Spark écrit un fichier par partition mémoire. Nous avons utilisé repartition(3)
pour créer trois partitions de mémoire, donc trois fichiers ont été écrits.
Écriture d’un fichier avec repartition
Nous pouvons utiliser repartition(1)
écrire un seul fichier.
df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")
Voici le fichier qui est écrit sur le disque.
Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv
Nous ne pouvons pas contrôler le nom du fichier écrit. Nous pouvons contrôler le nom du répertoire, mais pas le fichier lui-même.
Cette solution n’est pas suffisante lorsque vous souhaitez écrire des données dans un fichier avec un nom spécifique.
Écriture d’un seul fichier avec coalesce
Nous pouvons également utiliser coalesce(1)
pour écrire un seul fichier.
df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")
Voici ce qui est produit.
Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv
coalesce ne nous permet pas non plus de définir un nom de fichier spécifique (il nous permet seulement de personnaliser le nom du dossier). Nous devrons utiliser spark-daria pour accéder à une méthode qui produira un seul fichier.
Écriture d’un fichier avec un nom spécifique
Vous pouvez utiliser la fonction DariaWriters.writeSingleFile
définie dans spark-daria pour écrire un seul fichier avec un nom de fichier spécifique.
Voici le code qui écrit le contenu d’une trame de données dans le fichier ~/Documents/better/mydata.csv
.
import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")
La méthode writeSingleFile
vous permet de nommer le fichier sans vous soucier des détails d’implémentation compliqués.
writeSingleFile
utilise repartition(1)
et les méthodes du système de fichiers Hadoop sous le capot. Toutes les méthodes du système de fichiers Hadoop sont disponibles dans n’importe quel environnement d’exécution Spark – vous n’avez pas besoin d’attacher de fichiers JAR séparés.
Compatibilité avec d’autres systèmes de fichiers
Il est préférable d’utiliser les méthodes du système de fichiers Hadoop lors du déplacement, du renommage ou de la suppression de fichiers, afin que votre code fonctionne sur plusieurs plates-formes. writeSingleFile
fonctionne sur votre système de fichiers local et dans S3. Vous pouvez utiliser cette approche lors de l’exécution de Spark localement ou dans un bloc-notes Databricks.
Il existe d’autres solutions à ce problème qui ne sont pas multiplateformes. Il existe des solutions qui ne fonctionnent que dans les blocs-notes Databricks, ou ne fonctionnent que dans S3, ou ne fonctionnent que sur un système d’exploitation de type Unix.
Les méthodes du système de fichiers Hadoop sont maladroites à utiliser, mais la meilleure option est qu’elles fonctionnent sur plusieurs plates-formes.
La méthode writeSingleFile
utilise la méthode Hadoop fs.rename()
, comme décrit dans cette réponse. Voici le code d’alimentation:
val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)
copyMerge
Hadoop 2 a une méthode FileUtil.copyMerge()
qui est une solution élégante à ce problème, mais cette méthode est obsolète et sera supprimée dans Hadoop 3. Il y a une réponse dans ce thread qui réimplante copyMerge
pour les utilisateurs de Hadoop 3.
Dans tous les cas, n’écrivez pas de code qui repose sur la méthode FileUtil.copyMerge()
. Nous savons que cette méthode sera inaccessible lorsque Spark mettra à niveau Hadoop 3 et que vous ne voulez pas compter sur une méthode obsolète qui se cassera à un moment inconnu dans le futur.
Étapes suivantes
Vous voudrez généralement écrire plusieurs fichiers en parallèle, mais dans les rares occasions où vous souhaitez écrire un seul fichier, la méthode spark-daria writeSingleFile
vous aidera.
Faites de votre mieux pour envelopper la logique complexe du système de fichiers Hadoop dans des méthodes auxiliaires qui sont testées séparées. La combinaison des opérations du système de fichiers Hadoop et du code Spark dans la même méthode rendra votre code trop complexe.