aparch spark, Notserializable exception: org. apache. hadoop. io. Text
val bg = imageBundleRDD.first() //bg:[Text, BundleWritable]
val res= imageBundleRDD.map(data => {
val desBundle = colorToGray(bg._2) //lineA:NotSerializableException: org.apache.hadoop.io.Text
//val desBundle = colorToGray(data._2) //lineB:everything is ok
(data._1, desBundle)
})
println(res.count)
O LineB vai bem, mas a lineA mostra que: org.Apache.faisca.SparkException: Job aborted: Task not serializable: java. io. NotSerializableException: org. apache. hadoop. io. Text
tento usar o Kryo para resolver o meu problema, mas parece que nada mudou:
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Text])
kryo.register(classOf[BundleWritable])
}
}
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(...
Obrigado!!!
3 answers
Tive um problema semelhante quando o meu código Java estava a ler ficheiros de sequência que continham chaves de texto. Achei este post útil:
No meu caso, converti o texto para um texto usando o mapa:
JavaPairRDD<String, VideoRecording> mapped = videos.map(new PairFunction<Tuple2<Text,VideoRecording>,String,VideoRecording>() {
@Override
public Tuple2<String, VideoRecording> call(
Tuple2<Text, VideoRecording> kv) throws Exception {
// Necessary to copy value as Hadoop chooses to reuse objects
VideoRecording vr = new VideoRecording(kv._2);
return new Tuple2(kv._1.toString(), vr);
}
});
Esteja ciente desta nota na API para o método sequenceFile no JavaSparkContext:
Nota: Porque o leitor de registos do Hadoop a classe re-usa o mesmo objecto Writável para cada registo, Cache directamente o RDD devolvido irá criar muitas referências ao mesmo objecto. Se planeia copiar directamente os objectos writable do Hadoop, deverá primeiro copiá-los com uma função de mapa.
A razão pela qual o seu código tem o problema de serialização é que a sua configuração de Kryo, embora próxima, não está bem:
Alteração:
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(...
To:
val sparkConf = new SparkConf()
// ... set master, appname, etc, then:
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(sparkConf)
EmApache Spark enquanto lidamos com arquivos de sequência , Temos de seguir estas técnicas:
-- Use Java equivalent Data Types in place of Hadoop data types. -- Spark Automatically converts the Writables into Java equivalent Types. Ex:- We have a sequence file "xyz", here key type is say Text and value is LongWritable. When we use this file to create an RDD, we need use their java equivalent data types i.e., String and Long respectively. val mydata = = sc.sequenceFile[String, Long]("path/to/xyz") mydata.collect