Desempenho da faísca para Scala vs Python

Prefiro Python a Scala. Mas, como Spark é nativamente escrito em Scala, eu estava esperando meu código para correr mais rápido no Scala do que a versão Python por razões óbvias.

Com essa suposição, pensei em aprender e escrever a versão Scala de algum código de pré-processamento muito comum para cerca de 1 GB de dados. Os dados são retirados do Concurso SpringLeaf em [[9]] Kaggle . Apenas para dar uma visão geral dos dados (ele contém 1936 dimensões e 145232 linhas). Os dados são compostos por: vários tipos, por exemplo, int, float, string, boolean. Eu estou usando 6 núcleos de 8 para processamento de faíscas; é por isso que eu usei minPartitions=6 de modo que cada núcleo tem algo para processar.

Código Scala

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Código Python

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Desempenho Scala Fase 0 (38 minutos), Fase 1 (18 segundos) enter image description here

Desempenho Em Python Fase 0( 11 minutos), Fase 1 (7 segundos) enter image description here

Ambos produzem uma visualização DAG diferente. gráficos (devido aos quais ambas as imagens mostram diferentes funções do estágio 0 para Scala (map) e Python (reduceByKey))

mas, essencialmente, ambos os códigos tentam transformar os dados em (dimension_id, cadeia de valores) RDD e gravar no disco. A saída será usada para calcular várias estatísticas para cada dimensão.

Performance wise, Scala code for this real data like this seems to run 4 times slower than the Python version. A boa notícia para mim é que me deu uma boa motivação para fica com o Python. A má notícia é que não percebi bem porquê?

Author: Community, 2015-09-08

1 answers


A resposta original a discutir o código pode ser encontrada abaixo.

Em primeiro lugar, você tem que distinguir entre diferentes tipos de API, Cada um com suas próprias considerações de desempenho.

RDD API

(estruturas puras em Python com orquestração baseada em JVM)

Este é o componente que será mais afectado pelo desempenho do código Python e pelos detalhes da implementação PySpark. Enquanto o desempenho em Python é bastante é pouco provável que seja um problema, existem pelo menos alguns factores que tem de considerar:

  • Overhead of JVM communication. Praticamente todos os dados que vêm de e para o executor Python têm que ser passados através de um socket e um trabalhador JVM. Embora esta seja uma comunicação local relativamente eficiente, ela ainda não é livre.
  • Executores baseados em processos (Python) versus executores baseados em thread (single JVM multiple threads) (Scala). Cada executor Python funciona em seu próprio processo. Como um lado effect, fornece um isolamento mais forte do que a sua contraparte JVM e algum controlo sobre o ciclo de vida do executor, mas potencialmente um uso de memória significativamente maior:
      Pegada de memória do intérprete
  • pegada das bibliotecas carregadas
  • Difusão menos eficiente (cada processo requer a sua própria cópia de uma emissão)
  • Desempenho do próprio código Python. Geralmente falando Scala é mais rápido do que Python, mas vai variar de tarefa para tarefa. Além disso, você tem várias opções, incluindo JITs como Numba, C extensões (Cython) ou bibliotecas especializadas como Theano. Finalmente, se não utilizar ML / MLlib (ou simplesmente pilha NumPy), considere o PyPy como um interpretador alternativo. Ver SPARK-3094 .

  • a configuração do PySpark oferece a opção spark.python.worker.reuse que pode ser usada para escolher entre procurar o processo Python para cada tarefa e reutilizar o processo existente. O esta última opção parece ser útil para evitar a cobrança de lixo caro (é mais uma impressão do que um resultado de testes sistemáticos), enquanto a primeira (padrão) é ideal para no caso de Transmissões e importações caras.
  • a contagem de referência, utilizada como método de recolha de lixo de primeira linha em CPython, funciona muito bem com cargas típicas de faísca (processamento em fluxo, sem ciclos de referência) e reduz o risco de longo GC pausa.
  • MLlib

    (execução mista em Python e JVM)

    As considerações básicas são praticamente as mesmas que antes com algumas questões adicionais. Embora as estruturas básicas usadas com o MLlib sejam objetos RDD em Python, todos os algoritmos são executados diretamente usando Scala.

    Significa um custo adicional de conversão de objetos Python para objetos Scala e ao contrário, aumento do uso de memória e algumas limitações adicionais que vamos cobrir tarde. A partir de agora (Faísca 2.x), a API baseada em RDD está em Modo de manutenção e está programada para ser removida na Spark 3.0 .

    DataFrame API e Spark ML

    (execução JVM com código Python limitado ao driver)

    Estas são provavelmente a melhor escolha para tarefas normais de processamento de dados. Uma vez que o código Python é principalmente limitado a operações lógicas de alto nível no driver, não deve haver diferença de desempenho entre Python e Scala.

    Uma única excepção é o uso de UDFs em Python que são significativamente menos eficientes do que os seus equivalentes Scala. Embora haja alguma chance de melhorias (houve um desenvolvimento substancial no Spark 2.0.0), a maior limitação é o roundtrip completo entre a representação interna (JVM) e o interpretador Python. Se possível, você deve favorecer uma composição de expressões incorporadas (exemplo . O comportamento UDF em Python foi melhorado no Spark 2.0, mas ele ainda é subóptima em comparação com a execução nativa. Isto pode melhorar no futuro com a introdução das UDFs vectorizadas (SPARK-21190) .

    Certifique-se também de evitar a passagem desnecessária de dados entre DataFrames e RDDs. Isso requer serialização e deseralização dispendiosas, para não mencionar a transferência de dados de e para o interpretador Python.

    Vale a pena notar que as chamadas Py4J têm uma latência muito alta. Isto inclui chamadas simples como:
    from pyspark.sql.functions import col
    
    col("foo")
    

    Normalmente, é não deve importar (overhead é constante e não depende da quantidade de dados), mas no caso de aplicações em tempo real, você pode considerar Cache/reutilização de wrappers Java.

    Gráficos e conjuntos de dados de faísca

    Por agora (faísca)1.6 2.1) nenhum deles fornece API PySpark para que você possa dizer que PySpark é infinitamente pior do que Scala. GraphX

    Na prática, o desenvolvimento do GraphX parou quase completamente e o projecto está actualmente no modo de manutenção com Os bilhetes para o JIRA estão fechados, porque não resolvem nada.. GraphFrames a biblioteca oferece uma biblioteca alternativa de processamento de grafos com ligações em Python.

    Conjunto

    Subjetivamente falando, não há muito lugar para tipagem estática Datasets em Python e mesmo que houvesse a implementação Scala atual é muito simplista e não fornece os mesmos benefícios de desempenho que DataFrame.

    Streaming

    Pelo que vi até agora, gostaria muito. recomenda a utilização do Scala em vez do Python. Pode mudar no futuro se a PySpark conseguir suporte para fluxos estruturados, mas agora a API Scala parece ser muito mais robusta, abrangente e eficiente. A minha experiência é bastante limitada.

    Transmissão estruturada em Faísca 2.x parece reduzir a diferença entre as línguas, mas por enquanto ainda está em seus primeiros dias. No entanto, a API baseada em RDD já é referenciada como "streaming legado" na documentação da Base de dados (data de acesso 2017-03-03)), pelo que é razoável esperar mais esforços de Unificação.

    Considerações de não desempenho

    Paridade das funcionalidades Nem todas as faíscas são expostas através da PySpark API. Certifique-se de verificar se as partes que você precisa já estão implementadas e tentar compreender possíveis limitações.

    É particularmente importante quando se usa MLlib e contextos mistos semelhantes (ver chamando a Função Java/Scala de uma tarefa). Para ser justo, algumas partes da API PySpark, como mllib.linalg, fornece um conjunto mais abrangente de métodos do que Scala.

    Desenho da API

    A API de PySpark reflecte de perto a sua contraparte Scala e, como tal, não é exactamente Pythonic. Significa que é muito fácil mapear entre linguagens, mas ao mesmo tempo, o código Python pode ser significativamente mais difícil de entender.

    Arquitectura complexa O fluxo de dados de PySpark é relativamente complexo em comparação com a execução pura da JVM. É muito mais difícil raciocinar sobre Programas PySpark ou depurar. Alem pelo menos a compreensão básica de Scala e JVM em geral é praticamente uma obrigação. Faísca 2.x e para além

    A mudança em curso para a API Dataset, com a API RDD congelada, traz oportunidades e desafios para os utilizadores do Python. Enquanto partes de alto nível da API são muito mais fáceis de expor em Python, as características mais avançadas são praticamente impossíveis de ser usadas diretamente.

    Além disso, as funções nativas em Python continuam a ser cidadãos de segunda classe no mundo SQL. Esperemos que isto melhore no futuro com a serialização da seta do Apache ( os actuais esforços visam os dados collection mas UDF serde é um objetivo a longo prazo {[[156]}).

    Para projectos que dependam fortemente da base de código em Python, alternativas puras em Python (como Dask ou Ray) poderiam ser uma alternativa interessante.

    Não tem de ser um contra o outro. A API Spark DataFrame (SQL, Dataset) fornece uma forma elegante de integrar Código Scala / Java na aplicação PySpark. Você pode usar DataFrames para expor dados a um código JVM nativo e ler de volta os resultados. Eu expliquei algumas opções em algum outro lugar e você pode encontrar um exemplo de trabalho de Python-Scala roundtrip em como usar uma classe Scala dentro do Pyspark .

    Pode ainda ser aumentado pela introdução de tipos definidos pelo Utilizador (ver como definir o esquema para o tipo personalizado na Spark SQL?).


    Qual é o problema com o código fornecido em a questão

    (Disclaimer: Pythonista point of view. Provavelmente perdi alguns truques de Scala.)

    Em primeiro lugar, há uma parte do teu código que não faz sentido nenhum. Se você já tem (key, value) pares criados usando zipWithIndex ou enumerate Qual é o ponto em criar string só para dividi-lo logo depois? flatMap não funciona recursivamente então você pode simplesmente produzir tuplas e saltar após {[[18]} qualquer coisa. Outra parte que acho problemática é ... reduceByKey. De um modo geral, reduceByKey é útil se a aplicação da função agregada pode reduzir a quantidade de dados que tem de ser baralhado. Uma vez que você simplesmente concatenar cordas não há nada a ganhar aqui. Ignorando coisas de baixo nível, como o número de referências, a quantidade de dados que você tem que Transferir é exatamente a mesma que para groupByKey. Normalmente, não me preocuparia com isso, mas, pelo que sei, é um estrangulamento no seu código Scala. Juntar as cordas na JVM é uma operação bastante dispendiosa (see for example: Is string concatenation in scala as costly as it is in Java?). Isso significa que algo assim _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) que é equivalente a input4.reduceByKey(valsConcat) em seu código não é uma boa idéia.

    Se quiser evitar {[21] } pode tentar utilizar aggregateByKey com StringBuilder. Algo semelhante a isto deve funcionar.

    rdd.aggregateByKey(new StringBuilder)(
      (acc, e) => {
        if(!acc.isEmpty) acc.append(",").append(e)
        else acc.append(e)
      },
      (acc1, acc2) => {
        if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
        else acc1.append(",").addString(acc2)
      }
    )
    
    Mas duvido que valha a pena tanto alarido. Tendo em conta o que precede, reescrevi o seu código como

    Scala:

    val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
      (idx, iter) => if (idx == 0) iter.drop(1) else iter
    }
    
    val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
      case ("true", i) => (i, "1")
      case ("false", i) => (i, "0")
      case p => p.swap
    })
    
    val result = pairs.groupByKey.map{
      case (k, vals) =>  {
        val valsString = vals.mkString(",")
        s"$k,$valsString"
      }
    }
    
    result.saveAsTextFile("scalaout")
    

    Python:

    def drop_first_line(index, itr):
        if index == 0:
            return iter(list(itr)[1:])
        else:
            return itr
    
    def separate_cols(line):
        line = line.replace('true', '1').replace('false', '0')
        vals = line.split(',')
        for (i, x) in enumerate(vals):
            yield (i, x)
    
    input = (sc
        .textFile('train.csv', minPartitions=6)
        .mapPartitionsWithIndex(drop_first_line))
    
    pairs = input.flatMap(separate_cols)
    
    result = (pairs
        .groupByKey()
        .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
    
    result.saveAsTextFile("pythonout")
    

    Resultados

    No modo local[6] [Intel (R) Xeon (R) CPU E3-1245 V2 @ 3, 40 GHz] com memória 4GB por executor que toma (n = 3):

    • Scala-média: 250, 00 s, stdev: 12, 49
    • Python-média: 246, 66 s, stdev: 1, 15
    Tenho quase a certeza que a maior parte desse tempo é gasto em baralhar, serializar, deserializar e outras tarefas secundárias. Só por diversão, aqui está a ingenuidade código único em Python que executa a mesma tarefa nesta máquina em menos de um minuto:
    def go():
        with open("train.csv") as fr:
            lines = [
                line.replace('true', '1').replace('false', '0').split(",")
                for line in fr]
        return zip(*lines[1:])
    
     269
    Author: zero323, 2017-09-01 10:44:44