Hadoop MapReduce: contexto.gravar os valores das alterações

Sou novo no Hadoop e a escrever trabalhos de MapReduce e estou a deparar-me com um problema em que parece reduzir o contexto.o método de escrita está a mudar os valores correctos para valores incorrectos.

o que é suposto o trabalho de MapReduce fazer?

  • conte o número total de palavras (int wordCount)
  • conte o número de palavras distintas (int counter_dist)
  • conte o número de palavras que começam por " z "ou" Z " (int counter_startZ)
  • conte o número de palavras que aparecem menos de 4 tempos (int counter_less4)

Tudo isso deve ser feito em um único trabalho de MapReduce.

ficheiro de texto a ser analisado

Hello how zou zou zou zou how are you

saída Correta:
wordCount = 9
counter_dist = 5
counter_startZ = 4
counter_less4 = 4

Classe Mapper

public class WordCountMapper extends Mapper <Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            String hasKey = itr.nextToken();
            word.set(hasKey);
            context.write(word, one);
        }

    }
}

Classe Redutora
A fim de depurar o meu código, imprimi um monte de declarações para verificar os meus valores em cada ponto. O código Stdout está disponível abaixo.

public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable> {

    int wordCount = 0; // Total number of words
    int counter_dist = 0; // Number of distinct words in the corpus
    int counter_startZ = 0; // Number of words that start with letter Z
    int counter_less4 = 0; // Number of words that appear less than 4 

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int repeatedWords = 0;
        System.out.println("###Reduce method starts");
        System.out.println("Values: wordCount:" + wordCount + " counter_dist:" + counter_dist + " counter_startZ:" + counter_startZ + " counter_less4:" + counter_less4 + " (start)");
        for (IntWritable val : values){
            System.out.println("Key: " + key.toString());
            repeatedWords++;
            wordCount += val.get();
            if(key.toString().startsWith("z") || key.toString().startsWith("Z")){
            counter_startZ++;
            }
            System.out.println("Values: wordCount:" + wordCount + " counter_dist:" + counter_dist + " counter_startZ:" + counter_startZ + " counter_less4:" + counter_less4 + " (end of loop)");
        }
        counter_dist++;

        if(repeatedWords < 4){
            counter_less4++;
        }

        System.out.println("Values: wordCount:" + wordCount + " counter_dist:" + counter_dist + " counter_startZ:" + counter_startZ + " counter_less4:" + counter_less4 + " repeatedWords:" + repeatedWords + " (end)");
        System.out.println("###Reduce method ends\n");
    }


    @Override
    public void cleanup(Context context) throws IOException, InterruptedException{
        System.out.println("###CLEANUP: wordCount: " + wordCount);
        System.out.println("###CLEANUP: counter_dist: " + counter_dist);
        System.out.println("###CLEANUP: counter_startZ: " + counter_startZ);
        System.out.println("###CLEANUP: counter_less4: " + counter_less4);

        context.write(new Text("Total words: "), new IntWritable(wordCount));
        context.write(new Text("Distinct words: "), new IntWritable(counter_dist));
        context.write(new Text("Starts with Z: "), new IntWritable(counter_startZ));
        context.write(new Text("Appears less than 4 times:"), new IntWritable(counter_less4));
    }


}

registo de Stdout, que estou a usar para depuração

###Reduce method starts
Values: wordCount:0 counter_dist:0 counter_startZ:0 counter_less4:0 (start)
Key: Hello
Values: wordCount:1 counter_dist:0 counter_startZ:0 counter_less4:0 (end of loop)
Values: wordCount:1 counter_dist:1 counter_startZ:0 counter_less4:1 repeatedWords:1 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:1 counter_dist:1 counter_startZ:0 counter_less4:1 (start)
Key: are
Values: wordCount:2 counter_dist:1 counter_startZ:0 counter_less4:1 (end of loop)
Values: wordCount:2 counter_dist:2 counter_startZ:0 counter_less4:2 repeatedWords:1 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:2 counter_dist:2 counter_startZ:0 counter_less4:2 (start)
Key: how
Values: wordCount:3 counter_dist:2 counter_startZ:0 counter_less4:2 (end of loop)
Key: how
Values: wordCount:4 counter_dist:2 counter_startZ:0 counter_less4:2 (end of loop)
Values: wordCount:4 counter_dist:3 counter_startZ:0 counter_less4:3 repeatedWords:2 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:4 counter_dist:3 counter_startZ:0 counter_less4:3 (start)
Key: you
Values: wordCount:5 counter_dist:3 counter_startZ:0 counter_less4:3 (end of loop)
Values: wordCount:5 counter_dist:4 counter_startZ:0 counter_less4:4 repeatedWords:1 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:5 counter_dist:4 counter_startZ:0 counter_less4:4 (start)
Key: zou
Values: wordCount:6 counter_dist:4 counter_startZ:1 counter_less4:4 (end of loop)
Key: zou
Values: wordCount:7 counter_dist:4 counter_startZ:2 counter_less4:4 (end of loop)
Key: zou
Values: wordCount:8 counter_dist:4 counter_startZ:3 counter_less4:4 (end of loop)
Key: zou
Values: wordCount:9 counter_dist:4 counter_startZ:4 counter_less4:4 (end of loop)
Values: wordCount:9 counter_dist:5 counter_startZ:4 counter_less4:4 repeatedWords:4 (end)
###Reduce method ends

###CLEANUP: wordCount: 9
###CLEANUP: counter_dist: 5
###CLEANUP: counter_startZ: 4
###CLEANUP: counter_less4: 4
Pelo registo, parece que todos os valores estão correctos e que tudo funciona bem. No entanto, quando eu abrir o diretório de saída em HDFS e ler o arquivo "part-r-00000", a saída do contexto.escrever que está escrito lá é completamente diferente.

Total words: 22
Distinct words: 4
Starts with Z: 0
Appears less than 4 times: 4
Author: Nicke011, 2018-03-07

1 answers

Nunca deves confiar no método cleanup() para a lógica crucial do programa. O método cleanup() é chamado sempre que um JVM é roubado. Assim, com base no número de JVM (que você não pode prever) gerado e morto, sua lógica se torna Volátil.

Mover o initialization e escrever para o contexto em ambos para reduzir o método.

I. e.

int wordCount = 0; // Total number of words
int counter_dist = 0; // Number of distinct words in the corpus
int counter_startZ = 0; // Number of words that start with letter Z
int counter_less4 = 0; // Number of words that appear less than 4 

E

   context.write(new Text("Total words: "), new IntWritable(wordCount));
    context.write(new Text("Distinct words: "), new IntWritable(counter_dist));
    context.write(new Text("Starts with Z: "), new IntWritable(counter_startZ));
    context.write(new Text("Appears less than 4 times:"), new IntWritable(counter_less4));

EDIT: com base em comentários OP, parece que toda a lógica é falha.

Abaixo está o código para alcançar o resultado desejado. por favor, note que, eu não implementei setup() ou cleanup(); porque isso não é de todo necessário.

Use contadores para contar o que procura. Depois do MapReduce terminar, vai buscar os contadores da aula de condução.

Por exemplo O número de palavras e as palavras que começam por "z" ou "Z" podem ser contadas no mapa

public class WordCountMapper extends Mapper <Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            String hasKey = itr.nextToken();
            word.set(hasKey);
            context.getCounter("my_counters", "TOTAL_WORDS").increment(1);
            if(hasKey.toUpperCase().startsWith("Z")){
            context.getCounter("my_counters", "Z_WORDS").increment(1);
            }
            context.write(word, one);
        }
    }
}

O número de palavras distintase words appearing less than 4 times pode ser contado em Redutor. contador.

public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int wordCount= 0;
        context.getCounter("my_counters", "DISTINCT_WORDS").increment(1);
        for (IntWritable val : values){
            wordCount += val.get();
        }
        if(wordCount < 4{
           context.getCounter("my_counters", "WORDS_LESS_THAN_4").increment(1);
        }
    }
}
Na aula de condução, tragam os contadores. O código abaixo segue a linha onde você submeteu a tarefa
CounterGroup group = job.getCounters().getGroup("my_counters");

for (Counter counter : group) {
   System.out.println(counter.getName() + "=" + counter.getValue());
}
 1
Author: gyan, 2018-03-07 02:12:03