este blog explica como escrever um DataFrame para um único arquivo com Spark. Ele também descreve como escrever dados em um arquivo com um nome específico, o que é surpreendentemente desafiador.Escrever um único arquivo com faísca não é típico. Spark é projetado para escrever vários arquivos em paralelo. Escrever muitos arquivos ao mesmo tempo é mais rápido para grandes conjuntos de dados.
comportamento padrão
vamos criar um DataFrame, usar repartition(3)
para criar três partições de memória, e então escrever o arquivo para o disco.
val df = Seq("one", "two", "three").toDF("num")df .repartition(3) .write.csv(sys.env("HOME")+ "/Documents/tmp/some-files")
Aqui estão os arquivos que são gerados no disco.
Documents/ tmp/ some-files/ _SUCCESS part-00000-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00001-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv part-00002-b69460e8-fdc3-4593-bab4-bd15fa0dad98-c000.csv
Spark escreve um arquivo por partição de memória. Nós usamos repartition(3)
para criar três partições de memória, então três arquivos foram escritos.
escrever um ficheiro com repartição
podemos usar repartition(1)
escrever um único ficheiro.Aqui está o ficheiro que está escrito no disco.Não podemos controlar o nome do ficheiro que está escrito. Podemos controlar o nome do diretório, mas não o arquivo em si.
esta solução não é suficiente quando você quer escrever dados para um arquivo com um nome específico.
escrever um único arquivo com coalesce
também podemos usar coalesce(1)
para escrever um único arquivo.
df .coalesce(1) .write.csv(sys.env("HOME")+ "/Documents/tmp/one-file-coalesce")
aqui está o que é outputted.
Documents/ tmp/ one-file-coalesce/ _SUCCESS part-00000-c7521799-e6d8-498d-b857-2aba7f56533a-c000.csv
coalesce também não nos permite definir um nome de ficheiro específico (só nos permite personalizar o nome da pasta). Precisamos de usar a spark-daria para aceder a um método que dê origem a um único ficheiro.
escrever um ficheiro com um nome específico
pode utilizar a função DariaWriters.writeSingleFile
definida no spark-daria para escrever um único ficheiro com um nome de ficheiro específico.
aqui está o código que escreve o conteúdo de um DataFrame para o arquivo ~/Documents/better/mydata.csv
.
import com.github.mrpowers.spark.daria.sql.DariaWritersDariaWriters.writeSingleFile( df = df, format = "csv", sc = spark.sparkContext, tmpFolder = sys.env("HOME") + "/Documents/better/tmp", filename = sys.env("HOME") + "/Documents/better/mydata.csv")
the writeSingleFile
method let’s you name the file without concerning about complicated implementation details.
writeSingleFile
é usado repartition(1)
e métodos do sistema de ficheiros Hadoop por baixo do capô. Todos os métodos do sistema de arquivos Hadoop estão disponíveis em qualquer ambiente de execução Spark – você não precisa anexar quaisquer frascos separados.
compatibilidade com outros sistemas de ficheiros
é melhor usar os métodos do sistema de ficheiros Hadoop ao mover, mudar o nome ou apagar ficheiros, para que o seu código funcione em várias plataformas. writeSingleFile
trabalha no seu sistema de ficheiros local e em S3. Você pode usar esta abordagem ao executar Spark localmente ou em um caderno de dados.Existem outras soluções para este problema que não são plataformas cruzadas. Existem soluções que só funcionam em cadernos de dados, ou apenas funcionam em S3, ou apenas funcionam em um sistema operacional Unix-like.
os métodos do sistema de arquivos Hadoop são desajeitados para trabalhar, mas a melhor opção porque eles trabalham em várias plataformas.
o método writeSingleFile
usa o método fs.rename()
Hadoop, como descrito nesta resposta. Aqui está o psuedocode:
val src = new Path("s3a://bucket/data/src")val dest = new Path("s3a://bucket/data/dest")val conf = sc.hadoopConfiguration // assuming sc = spark contextval fs = src.getFileSystem(conf)fs.rename(src, dest)
copyMerge
Hadoop 2 tem um FileUtil.copyMerge()
método que uma solução elegante para este problema, mas este método está obsoleto e será removido no Hadoop 3. Há uma resposta neste tópico que reimplements copyMerge
para usuários Hadoop 3.
em qualquer caso, não escreva o código que se baseia no método FileUtil.copyMerge()
. Sabemos que o método será inacessível quando Spark upgrades para Hadoop 3 e você não quer confiar em um método depreciado que vai quebrar em algum momento desconhecido no futuro.
próximos passos
normalmente você vai querer escrever vários arquivos em paralelo, mas nas raras ocasiões em que você quer escrever um único arquivo, o método spark-daria writeSingleFile
vai ajudar.
tente o seu melhor para envolver a complexa lógica do sistema de ficheiros Hadoop em métodos auxiliares que são testados separados. Combinar as operações do sistema de ficheiros Hadoop e o código Spark no mesmo método tornará o seu código demasiado complexo.