aparch spark, Notserializable exception: org. apache. hadoop. io. Text

Aqui está o meu código:
  val bg = imageBundleRDD.first()    //bg:[Text, BundleWritable]
  val res= imageBundleRDD.map(data => {
                                val desBundle = colorToGray(bg._2)        //lineA:NotSerializableException: org.apache.hadoop.io.Text
                                //val desBundle = colorToGray(data._2)    //lineB:everything is ok
                                (data._1, desBundle)
                             })
  println(res.count)

O LineB vai bem, mas a lineA mostra que: org.Apache.faisca.SparkException: Job aborted: Task not serializable: java. io. NotSerializableException: org. apache. hadoop. io. Text

tento usar o Kryo para resolver o meu problema, mas parece que nada mudou:

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
       kryo.register(classOf[Text])
       kryo.register(classOf[BundleWritable])
  }
}

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(...
Obrigado!!!

Author: rene, 2014-01-12

3 answers

Tive um problema semelhante quando o meu código Java estava a ler ficheiros de sequência que continham chaves de texto. Achei este post útil:

Http://apache-spark-user-list.1001560.n3.nabble.com/How-to-solve-java-io-NotSerializableException-org-apache-hadoop-io-Text-td2650.html

No meu caso, converti o texto para um texto usando o mapa:

JavaPairRDD<String, VideoRecording> mapped = videos.map(new PairFunction<Tuple2<Text,VideoRecording>,String,VideoRecording>() {
    @Override
    public Tuple2<String, VideoRecording> call(
            Tuple2<Text, VideoRecording> kv) throws Exception {
        // Necessary to copy value as Hadoop chooses to reuse objects
        VideoRecording vr = new VideoRecording(kv._2);
        return new Tuple2(kv._1.toString(), vr);
    }
});

Esteja ciente desta nota na API para o método sequenceFile no JavaSparkContext:

Nota: Porque o leitor de registos do Hadoop a classe re-usa o mesmo objecto Writável para cada registo, Cache directamente o RDD devolvido irá criar muitas referências ao mesmo objecto. Se planeia copiar directamente os objectos writable do Hadoop, deverá primeiro copiá-los com uma função de mapa.

 1
Author: Madhu, 2014-04-05 18:55:01

A razão pela qual o seu código tem o problema de serialização é que a sua configuração de Kryo, embora próxima, não está bem:

Alteração:

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(...

To:

val sparkConf = new SparkConf()
  // ... set master, appname, etc, then:
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")

val sc = new SparkContext(sparkConf)
 0
Author: zuluchas, 2015-08-31 00:22:30

EmApache Spark enquanto lidamos com arquivos de sequência , Temos de seguir estas técnicas:

 -- Use Java equivalent Data Types in place of Hadoop data types.
 -- Spark Automatically converts the Writables into Java equivalent Types.

Ex:- We have a sequence file "xyz", here key type is say Text and value
is LongWritable. When we use this file to create an RDD, we need use  their 
java  equivalent data types i.e., String and Long respectively.

 val mydata = = sc.sequenceFile[String, Long]("path/to/xyz")
 mydata.collect

 0
Author: Naga, 2016-12-16 19:17:07