このブログでは、Sparkで単一のファイルにDataFrameを書き出す方法について説明します。 また、特定の名前のファイルにデータを書き込む方法についても説明していますが、これは驚くほど困難です。
Sparkで単一のファイルを書き出すのは一般的ではありません。 Sparkは、複数のファイルを並行して書き出すように設計されています。 大きなデータセットでは、同時に多くのファイルを書き出す方が高速です。
デフォルトの動作
データフレームを作成し、repartition(3)
を使用して三つのメモリパーティションを作成し、ファイルをディスクに書き出しましょう。
val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")
ディスク上に生成されるファイルは次のとおりです。
Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv
Sparkはメモリパーティションごとに一つのファイルを書き出します。 repartition(3)
を使用して三つのメモリパーティションを作成したので、三つのファイルが書き込まれました。
再分割で一つのファイルを書き出す
repartition(1)
を使用して一つのファイルを書き出すことができます。
df .repartition(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-repartition")
ディスクに書き込まれたファイルは次のとおりです。
Documents/ tmp/ one-file-repartition/ _SUCCESS part-00000-d5a15f40-e787-4fd2-b8eb-c810d973b6fe-c000.csv
書き込まれるファイルの名前を制御できません。 ディレクトリの名前は制御できますが、ファイル自体は制御できません。
特定の名前のファイルにデータを書きたい場合は、この解決策では不十分です。
coalesceで単一のファイルを書き出す
また、coalesce(1)
を使用して単一のファイルを書き出すこともできます。
df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")
出力されたものは次のとおりです。
Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv
coalesceでは、特定のファイル名を設定することはできません(フォルダ名をカスタマイズするだけです)。 単一のファイルを出力するメソッドにアクセスするには、spark-dariaを使用する必要があります。Spark-dariaで定義されているDariaWriters.writeSingleFile
関数を使用して、特定のファイル名を持つ単一のファイルを書き出すことができます。
特定の名前を持つファイルを書き出す
DataFrameの内容を~/Documents/better/mydata.csv
ファイルに書き込むコードを次に示します。
import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")
writeSingleFile
メソッドは、複雑な実装の詳細を気にせずにファイルに名前を付けましょう。
writeSingleFile
はrepartition(1)
とHadoopファイルシステムメソッドを使用しています。 すべてのHadoop filesystemメソッドは、任意のSparkランタイム環境で使用できます。
他のファイルシステムとの互換性
ファイルを移動、名前変更、または削除するときは、Hadoop filesystemメソッドを使用することをお勧めします。 writeSingleFile
はローカルのファイルシステムとS3で動作します。 この方法は、SparkをローカルまたはDatabricks notebookで実行する場合に使用できます。
クロスプラットフォームではないこの問題に対する他の解決策があります。 Databricks notebookでのみ動作するソリューション、またはS3でのみ動作するソリューション、またはUnixライクなオペレーティングシステムでのみ動作するソリューションがあります。
Hadoop filesystemメソッドは動作するのに不器用ですが、最良のオプションは複数のプラットフォームで動作する原因となります。この回答で説明されているように、writeSingleFile
メソッドはfs.rename()
Hadoopメソッドを使用します。 ここにpsuedocodeがあります:
val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)
copyMerge
Hadoop2には、この問題のエレガントな解決策であるFileUtil.copyMerge()
メソッドがありますが、このメソッドは非推奨であり、Hadoop3では削除されます。 このスレッドには、Hadoop3ユーザーのためにcopyMerge
を再実装する答えがあります。
いずれにしても、FileUtil.copyMerge()
メソッドに依存するコードを記述しないでください。 SparkがHadoop3にアップグレードすると、メソッドにアクセスできなくなり、将来未知の時間に壊れる非推奨のメソッドに依存したくないことを知っています。
次のステップ
通常は複数のファイルを並行して書き出す必要がありますが、まれに単一のファイルを書き出す場合は、spark-dariawriteSingleFile
メソッドが役立
複雑なHadoopファイルシステムロジックを分離してテストされたヘルパーメソッドでラップするように最善を尽くしてくださ Hadoopファイルシステム操作とSparkコードを同じ方法で組み合わせると、コードが複雑になります。