Como salvar um DataFrame spark como csv em disco?

por exemplo, o resultado disto:

df.filter("project = 'en'").select("title","count").groupBy("title").sum()

devolveria uma matriz.

Como gravar um ficheiro csv em disco ?

Author: Shaido, 2015-10-16

3 answers

A faísca Apache não suporta a saída CSV nativa no disco.

No entanto, tem quatro soluções disponíveis:

  1. Pode converter o seu Dataframe num RDD:

    def convertToReadableString(r : Row) = ???
    df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath)
    

    Isto irá criar um ficheiro de pastas. Sob a localização do ficheiro, irá encontrar ficheiros de partições (p. ex. part-000*)

    O que costumo fazer se quero juntar todas as partições num grande CSV é
    cat filePath/part* > mycsvfile.csv
    

    Alguns irão usar coalesce(1,false) para criar uma partição do RDD. Geralmente é uma má prática, uma vez que pode sobrecarregar o motorista puxando todos os dados que você está coletando para ele.

    Note que {[6] } vai devolver um RDD[Row].

  2. Pode usar bases de dados spark-csv biblioteca:

    • Faísca 1, 4+:

      df.write.format("com.databricks.spark.csv").save(filepath)
      
    • Faísca 1.3:

      df.save(filepath,"com.databricks.spark.csv")
      
  3. Com Faísca 2.x o pacote spark-csv não é necessário, uma vez que está incluído no Faisca.

    df.write.format("csv").save(filepath)
    
  4. Você pode converter para moldura de dados de Pandas local e usar o método to_csv (Apenas PySpark).

Nota: As Soluções 1, 2 e 3 irão resultar em ficheiros de formato CSV (part-*) gerados pela API Hadoop subjacente que chama quando invocar save. Você terá um arquivo part- por partição.

 15
Author: eliasah, 2018-01-12 09:11:32
Tive um problema semelhante. Precisava de escrever o ficheiro csv no driver enquanto estava ligado ao cluster no modo cliente. Eu queria reutilizar o mesmo código de análise CSV que o Apache Spark para evitar erros potenciais.

Verifiquei o código spark-csv e encontrei o código responsável pela conversão do dataframe em csv bruto RDD[String] em com.databricks.spark.csv.CsvSchemaRDD.

Infelizmente, está marcado com sc.textFile e o fim do método relevante.

Copiei esse código e removi as últimas linhas com sc.textFile e voltei. RDD directamente em vez disso.

O meu código:

/*
  This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
  Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
  But in last lines of that method it's hardcoded against writing as text file -
  for our case we need RDD.
 */
object DataframeToRawCsvRDD {

  val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat

  def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
           (implicit ctx: ExecutionContext): RDD[String] = {
    val delimiter = parameters.getOrElse("delimiter", ",")
    val delimiterChar = if (delimiter.length == 1) {
      delimiter.charAt(0)
    } else {
      throw new Exception("Delimiter cannot be more than one character.")
    }

    val escape = parameters.getOrElse("escape", null)
    val escapeChar: Character = if (escape == null) {
      null
    } else if (escape.length == 1) {
      escape.charAt(0)
    } else {
      throw new Exception("Escape character cannot be more than one character.")
    }

    val quote = parameters.getOrElse("quote", "\"")
    val quoteChar: Character = if (quote == null) {
      null
    } else if (quote.length == 1) {
      quote.charAt(0)
    } else {
      throw new Exception("Quotation cannot be more than one character.")
    }

    val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
    val quoteMode: QuoteMode = if (quoteModeString == null) {
      null
    } else {
      QuoteMode.valueOf(quoteModeString.toUpperCase)
    }

    val nullValue = parameters.getOrElse("nullValue", "null")

    val csvFormat = defaultCsvFormat
      .withDelimiter(delimiterChar)
      .withQuote(quoteChar)
      .withEscape(escapeChar)
      .withQuoteMode(quoteMode)
      .withSkipHeaderRecord(false)
      .withNullString(nullValue)

    val generateHeader = parameters.getOrElse("header", "false").toBoolean
    val headerRdd = if (generateHeader) {
      ctx.sparkContext.parallelize(Seq(
        csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
      ))
    } else {
      ctx.sparkContext.emptyRDD[String]
    }

    val rowsRdd = dataFrame.rdd.map(row => {
      csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
    })

    headerRdd union rowsRdd
  }

}
 0
Author: Ajk, 2016-08-22 06:21:09

Eu tive um problema similar onde eu tive que salvar o conteúdo do dataframe para um arquivo csv de nome que eu defini. df.write("csv").save("<my-path>") estava a criar directório do que ficheiro. Por conseguinte, temos de encontrar as seguintes soluções. A maior parte do código é tirada do seguinte dataframe-to-csv com pequenas modificações na lógica.

def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = {
    val tmpParquetDir = "Posts.tmp.parquet"

    df.repartition(1).write.
        format("com.databricks.spark.csv").
        option("header", header.toString).
        option("delimiter", sep).
        save(tmpParquetDir)

    val dir = new File(tmpParquetDir)
    val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv"
    val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString
    (new File(tmpTsvFile)).renameTo(new File(tsvOutput))

    dir.listFiles.foreach( f => f.delete )
    dir.delete
    }
 0
Author: Jai Prakash, 2017-11-23 07:03:44