Como funciona o spark sc.o ficheiro de texto funciona em detalhe?
quero descobrir como sc.textfile
funciona em detalhe.
Encontrei o código fonte do ficheiro de texto em SparkContext.scala mas eles contêm tanta informação sobre scheduler, stage e tarefa submetida. O que eu quero é como sc.o ficheiro de texto lê ficheiros de hdfs e como sc.o ficheiro de texto usa caracteres especiais para corresponder a vários ficheiros.
Onde posso encontrar o código fonte?
3 answers
O Apache Spark usa a biblioteca cliente Hadoop para ler o ficheiro. Então você tem que ler o código fonte hadoop-client
para saber mais:
TextFile is a method of a org.Apache.faisca.Classe de SparkContext que lê um ficheiro de texto a partir de HDFS, um sistema de ficheiros local (disponível em todos os nós), ou qualquer URI do sistema de arquivos suportado pelo Hadoop, e retorná-lo como um RDD de Strings.
Sc.textFile(path,minpartions)
@localização do local do param para o ficheiro de texto num sistema de ficheiros suportado @ param minPartitions sugeriu o número mínimo de partições para o RDD resultante @return RDD das linhas do ficheiro de texto
Usa internamente o hadoopRDD (um RDD que fornece a funcionalidade principal para a leitura de dados armazenados no Hadoop)
Hadoop Rdd parece-se com isto
HadoopRDD(
sc, //Sparkcontext
confBroadcast, //A general Hadoop Configuration, or a subclass of it
Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates. inputFormatClass,
keyClass,
valueClass,
minPartitions)
No método do ficheiro de texto a que chamamos criar um hadoopRDD com algum valor hardcoded:
HadoopRDD(
sc, //Sparkcontext
confBroadcast, //A general Hadoop Configuration, or a subclass of it
Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates.
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text],
minPartitions)
Por causa destes valores codificados só somos capazes de ler os ficheiros de texto , por isso, se quisermos ler qualquer outro tipo de ficheiro, usamos o Hadooprd .
A função de cálculo no core\src\main\scala\org\apache\spark\rdd\HadoopRDD.scala
Aqui estão alguns códigos na função abaixo
var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()