Como escrever testes de unidade em Spark 2.0+?

Tenho tentado encontrar uma forma razoável de testar a estrutura de testes JUnit. Embora pareça haver bons exemplos para SparkContext, Eu não consegui descobrir como obter um exemplo correspondente trabalhando para SparkSession, Mesmo que seja usado em vários lugares internamente em spark-testing-base. Eu ficaria feliz em tentar uma solução que não use spark-testing-base também se não for realmente o caminho certo para ir aqui.

caso de teste simples ( projecto MWE completo com build.sbt):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

o resultado de correr isto com o JUnit é um NPE na linha de carga:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Note que não deve importar que o ficheiro que está a ser carregado exista ou não; numa SparkSession devidamente configurada, será lançado um erro mais sensato .

Author: bbarker, 2017-05-02

5 answers

Obrigado por colocar esta questão pendente. Por alguma razão, quando se trata de faísca, todo mundo fica tão preso na análise que eles se esquecem sobre as grandes práticas de engenharia de software que surgiram nos últimos 15 anos ou mais. É por isso que fazemos questão de discutir testes e integração contínua (entre outras coisas como DevOps) em nosso curso.

um rápido desvio na terminologia

A true unit test means você tem controle total sobre todos os componentes do teste. Não pode haver interação com bases de dados, chamadas de descanso, sistemas de arquivos, ou até mesmo o relógio do sistema; tudo tem que ser "dobrado" (por exemplo, zombaria, stubbed, etc) como Gerard Mezaros coloca em padrões de teste xUnit. Sei que isto parece semântica, mas realmente importa. Não entender esta é uma das principais razões pelas quais você vê falhas de teste intermitentes na integração contínua.

Ainda Podemos Unir-Nos. Ensaio

Tendo em conta este entendimento, é impossível testar uma unidade. No entanto, ainda há um lugar para testes de unidade ao desenvolver análises.

Considere uma operação simples:

rdd.map(foo).map(bar)

Aqui foo e bar são funções simples. Estes podem ser testados em unidade da forma normal, e devem estar com tantos casos de canto quanto você pode reunir. Afinal de contas, por que eles se importam onde eles estão recebendo suas entradas a partir de se é um equipamento de teste ou um RDD?

Não te esqueças da carapaça.

Isto não é um teste per se[35], mas nestes estágios iniciais você também deve estar experimentando na Concha de faísca para descobrir suas transformações e, especialmente, as consequências de sua abordagem. Por exemplo, você pode examinar planos de Consulta física e lógica, estratégia de particionamento e preservação, e o estado de seus dados com muitas funções diferentes como toDebugString, explain, glom, show, printSchema, e assim no. Vou deixar-te explorar isso.

Você também pode definir o seu mestre para local[2] na Concha de faísca e em seus testes para identificar quaisquer problemas que possam surgir apenas uma vez que você começar a distribuir o trabalho.

ensaio de integração com faísca

Agora, as coisas divertidas.

A fim de teste de integração Spark depois de se sentir confiante na qualidade das suas funções auxiliares e RDD/DataFrame lógica de transformação, é fundamental fazer alguns coisas (independentemente da ferramenta de compilação e do quadro de testes):

    Aumenta a memória da JVM. Activar a bifurcação, mas desactivar a execução paralela. Use a estrutura de teste para acumular os testes de integração da faísca em suites, e inicialize o SparkContext antes de todos os testes e pare-o depois de todos os testes.

Com o ScalaTest, você pode misturar em {[[17]} (que eu prefiro geralmente) ou BeforeAndAfterEach como @ShankarKoirala faz para inicializar e derrubar artefatos de faísca. Eu sei que isto é ... um lugar razoável para abrir uma excepção, mas não gosto dos mutáveis que tens de usar.

O Padrão Do Empréstimo

Outra abordagem é utilizar o padrão de empréstimos .

Por exemplo (usando o ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
} 

Como podem ver, o padrão do empréstimo faz uso de funções de ordem superior para "emprestar" o SparkContext para o teste e depois para se livrar dele depois de feito.

Programação Orientada Para O Sofrimento (Obrigado, Nathan))

É uma questão de preferência, mas prefiro usar o padrão do empréstimo e transferir as coisas o tempo que puder antes de introduzir outro quadro. Além de apenas tentar ficar leve, frameworks às vezes adicionar um monte de "magia" que faz as falhas de teste de depuração difícil de raciocinar. Então eu tomo uma abordagem de Programação Orientada para o sofrimento--onde eu evito adicionar um novo quadro até que a dor de não tê-lo seja demais para suportar. Mas, mais uma vez, a decisão é tua.

A melhor escolha para esse quadro alternativo é claro spark-testing-base {[[42]} como @ShankarKoirala mencionou. Nesse caso, o teste acima se pareceria com este:

class MySpec extends WordSpec with Matchers with SharedSparkContext {
      "My analytics" should {
        "calculate the right thing" in { 
          val data = Seq(...)
          val rdd = sc.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

          total shouldBe 1000
        }
      }
 }
Repara como não tive de fazer nada para lidar com o ... SharedSparkContext deu-me tudo isso ... com sc como o {16]} ... de graça. Pessoalmente, embora eu não traria esta dependência apenas para este propósito, uma vez que o padrão de empréstimo faz exatamente o que eu preciso para isso. Tambem, com tanta imprevisibilidade que acontece com os sistemas distribuídos, pode ser uma dor real ter que rastrear através da magia que acontece no código fonte de uma biblioteca de terceiros quando as coisas correm mal em integração contínua. Agora, onde realmente brilha a base de testes de faísca com os ajudantes à base de Hadoop como HDFSClusterLike e YARNClusterLike. Misturar essas características pode realmente poupar-lhe um monte de dor de configuração. Outro lugar onde brilha é com o Scalacheck. propriedades e geradores -- assumindo, claro, que você entende como os testes baseados em Propriedades funcionam e por que isso é útil. Mas mais uma vez, eu pessoalmente adiaria usá-lo até que a minha análise e os meus testes alcancem esse nível de sofisticação.

"só um Sith lida com absolutos."--Obi-Wan Kenobi

Claro que também não tens de escolher uma ou outra. Talvez você poderia usar a abordagem do padrão de empréstimo para a maioria de seus testes e Apenas para alguns testes mais rigorosos. A escolha não é binária; você pode fazer as duas coisas.

testes de integração com faísca

Por fim, gostaria de apresentar um trecho do que uma configuração de teste de integração SparkStreaming com valores de memória pode parecer sem a base de teste de faísca.:
val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd
Isto é mais simples do que parece. Ele realmente apenas transforma uma sequência de dados em uma fila para alimentar o DStream. A maior parte é realmente apenas configuração boilerplate que funciona com a Spark APIs. De qualquer forma, você pode comparar isso com StreamingSuiteBase como encontrado em spark-testing-base para decidir qual preferes. Este pode ser o meu posto mais longo de sempre, por isso vou deixá-lo aqui. Espero que outros se juntem a outras ideias para ajudar a melhorar a qualidade das nossas análises com as mesmas práticas ágeis de engenharia de software que melhoraram todo o desenvolvimento de aplicações.

E com desculpas pelo plug desavergonhado, você pode verificar o nosso curso Analytics com Apache Spark , onde abordamos muitas dessas ideias e muito mais. Esperamos ter uma versão online em breve.

 52
Author: Vidya, 2017-05-23 12:02:21

Podes escrever um teste simples com Fundsuite e antes e depois de cada um como em baixo

class Tests extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
  }

  test("your test name here"){
    //your unit test assert here like below
    assert("True".toLowerCase == "true")
  }

  override def afterEach() {
    sparkSession.stop()
  }
}

Você não precisa criar uma função no teste você pode simplesmente escrever como

test ("test name") {//implementation and assert}
O Holden Karau escreveu um teste muito bom.

Você precisa verificar abaixo é um exemplo simples

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}
Espero que isto ajude!
 17
Author: Shankar Koirala, 2017-05-06 12:05:17
Gosto de criar uma característica SparkSessionTestWrapper que pode ser misturada nas classes de teste. A abordagem de Shankar funciona, mas é proibitivamente lenta para suites de teste com vários arquivos.
import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("spark session").getOrCreate()
  }

}

O traço pode ser usado como se segue:

class DatasetSpec extends FunSpec with SparkSessionTestWrapper {

  import spark.implicits._

  describe("#count") {

    it("returns a count of all the rows in a DataFrame") {

      val sourceDF = Seq(
        ("jets"),
        ("barcelona")
      ).toDF("team")

      assert(sourceDF.count === 2)

    }

  }

}

Verifica o projecto spark-specpara um exemplo da vida real que usa a abordagem SparkSessionTestWrapper.

Actualizar

A Biblioteca base de testes de faíscaadiciona automaticamente a transmissão quando certas características são misturadas para a classe de teste(por exemplo, quando DataFrameSuiteBase é misturado, você terá acesso à SparkSession através da variável spark).

Criei uma biblioteca de testes separada chamada spark-fast-tests para dar aos utilizadores o controlo total do SparkSession ao executar os seus testes. Não acho que uma biblioteca auxiliar de testes deva definir o SparkSession. Os utilizadores devem ser capazes de iniciar e parar o seu SparkSession como entenderem (gosto de criar um SparkSession e usá-lo ao longo do conjunto de testes executar).

Eis um exemplo do método de testes rápidos em acção:
import com.github.mrpowers.spark.fast.tests.DatasetComparer

class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

    it("aliases a DataFrame") {

      val sourceDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("name")

      val actualDF = sourceDF.select(col("name").alias("student"))

      val expectedDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("student")

      assertSmallDatasetEquality(actualDF, expectedDF)

    }

  }

}
 8
Author: Powers, 2018-09-01 20:57:04

Uma Vez Que faísca 1, 6 podias usar SharedSQLContext essa faísca utiliza-se para os seus próprios ensaios unitários:

class YourAppTest extends SharedSQLContext {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    val df = sqlContext.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

Desde faísca 2.3 SharedSparkSession está disponível:

class YourAppTest extends SharedSparkSession {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    df = spark.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

Actualizar:

Dependência de Maven:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>

Dependência de SBT:

"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"
 3
Author: Eugene Lopatkin, 2018-06-19 06:05:25

Eu poderia resolver o problema com o código abaixo

A dependência das colmeias é adicionada ao projecto pom

class DataFrameTest extends FunSuite with DataFrameSuiteBase{
        test("test dataframe"){
        val sparkSession=spark
        import sparkSession.implicits._
        var df=sparkSession.read.format("csv").load("path/to/csv")
        //rest of the operations.
        }
        }
 1
Author: sunitha, 2018-03-26 12:57:04