Como salvar um DataFrame spark como csv em disco?
por exemplo, o resultado disto:
df.filter("project = 'en'").select("title","count").groupBy("title").sum()
devolveria uma matriz.
Como gravar um ficheiro csv em disco ?3 answers
A faísca Apache não suporta a saída CSV nativa no disco.
No entanto, tem quatro soluções disponíveis:
-
Pode converter o seu Dataframe num RDD:
def convertToReadableString(r : Row) = ??? df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath)
Isto irá criar um ficheiro de pastas. Sob a localização do ficheiro, irá encontrar ficheiros de partições (p. ex. part-000*)
O que costumo fazer se quero juntar todas as partições num grande CSV écat filePath/part* > mycsvfile.csv
Alguns irão usar
coalesce(1,false)
para criar uma partição do RDD. Geralmente é uma má prática, uma vez que pode sobrecarregar o motorista puxando todos os dados que você está coletando para ele.Note que {[6] } vai devolver um
RDD[Row]
. -
Pode usar bases de dados spark-csv biblioteca:
-
Faísca 1, 4+:
df.write.format("com.databricks.spark.csv").save(filepath)
-
Faísca 1.3:
df.save(filepath,"com.databricks.spark.csv")
-
-
Com Faísca 2.x o pacote
spark-csv
não é necessário, uma vez que está incluído no Faisca.df.write.format("csv").save(filepath)
Você pode converter para moldura de dados de Pandas local e usar o método
to_csv
(Apenas PySpark).
Nota: As Soluções 1, 2 e 3 irão resultar em ficheiros de formato CSV (part-*
) gerados pela API Hadoop subjacente que chama quando invocar save
. Você terá um arquivo part-
por partição.
Verifiquei o código spark-csv e encontrei o código responsável pela conversão do dataframe em csv bruto RDD[String]
em com.databricks.spark.csv.CsvSchemaRDD
.
sc.textFile
e o fim do método relevante.
Copiei esse código e removi as últimas linhas com sc.textFile
e voltei. RDD directamente em vez disso.
O meu código:
/*
This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
But in last lines of that method it's hardcoded against writing as text file -
for our case we need RDD.
*/
object DataframeToRawCsvRDD {
val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat
def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
(implicit ctx: ExecutionContext): RDD[String] = {
val delimiter = parameters.getOrElse("delimiter", ",")
val delimiterChar = if (delimiter.length == 1) {
delimiter.charAt(0)
} else {
throw new Exception("Delimiter cannot be more than one character.")
}
val escape = parameters.getOrElse("escape", null)
val escapeChar: Character = if (escape == null) {
null
} else if (escape.length == 1) {
escape.charAt(0)
} else {
throw new Exception("Escape character cannot be more than one character.")
}
val quote = parameters.getOrElse("quote", "\"")
val quoteChar: Character = if (quote == null) {
null
} else if (quote.length == 1) {
quote.charAt(0)
} else {
throw new Exception("Quotation cannot be more than one character.")
}
val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
val quoteMode: QuoteMode = if (quoteModeString == null) {
null
} else {
QuoteMode.valueOf(quoteModeString.toUpperCase)
}
val nullValue = parameters.getOrElse("nullValue", "null")
val csvFormat = defaultCsvFormat
.withDelimiter(delimiterChar)
.withQuote(quoteChar)
.withEscape(escapeChar)
.withQuoteMode(quoteMode)
.withSkipHeaderRecord(false)
.withNullString(nullValue)
val generateHeader = parameters.getOrElse("header", "false").toBoolean
val headerRdd = if (generateHeader) {
ctx.sparkContext.parallelize(Seq(
csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
))
} else {
ctx.sparkContext.emptyRDD[String]
}
val rowsRdd = dataFrame.rdd.map(row => {
csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
})
headerRdd union rowsRdd
}
}
Eu tive um problema similar onde eu tive que salvar o conteúdo do dataframe para um arquivo csv de nome que eu defini. df.write("csv").save("<my-path>")
estava a criar directório do que ficheiro. Por conseguinte, temos de encontrar as seguintes soluções.
A maior parte do código é tirada do seguinte dataframe-to-csv com pequenas modificações na lógica.
def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = {
val tmpParquetDir = "Posts.tmp.parquet"
df.repartition(1).write.
format("com.databricks.spark.csv").
option("header", header.toString).
option("delimiter", sep).
save(tmpParquetDir)
val dir = new File(tmpParquetDir)
val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv"
val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString
(new File(tmpTsvFile)).renameTo(new File(tsvOutput))
dir.listFiles.foreach( f => f.delete )
dir.delete
}