Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
457 views
in Technique[技术] by (71.8m points)

mapreduce - Output a list from a Hadoop Map Reduce job using custom writable

I'm trying to create a simple map reduce job by changing the wordcount example given by hadoop.

I'm trying to out put a list instead of a count of the words. The wordcount example gives the following ouput

hello 2
world 2

I'm trying to get it to output as a list, which will form the basis of future work

hello 1 1
world 1 1

I think I'm on the right track but I'm having trouble writing the list. Instead of the above, I'm getting

Hello   foo.MyArrayWritable@61250ff2
World   foo.MyArrayWritable@483a0ab1

Here's my MyArrayWritable. I put a sys out in the write(DataOuptut arg0) but it never output anything so I think that method might not be called and I don't know why.

class MyArrayWritable extends ArrayWritable{

public MyArrayWritable(Class<? extends Writable> valueClass, Writable[] values) {
    super(valueClass, values);
}
public MyArrayWritable(Class<? extends Writable> valueClass) {
    super(valueClass);
}

@Override
public IntWritable[] get() {
    return (IntWritable[]) super.get();
}

@Override
public void write(DataOutput arg0) throws IOException {
    for(IntWritable i : get()){
        i.write(arg0);
    }
}
}

EDIT - adding more source code

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
} 

public static class Reduce extends Reducer<Text, IntWritable, Text, MyArrayWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        ArrayList<IntWritable> list = new ArrayList<IntWritable>();    
        for (IntWritable val : values) {
            list.add(val);
        }
        context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
    }
}

public static void main(String[] args) throws Exception {
    if(args == null || args.length == 0)
        args = new String[]{"./wordcount/input","./wordcount/output"};
    Path p = new Path(args[1]);
    FileSystem fs = FileSystem.get(new Configuration());
    fs.exists(p);
    fs.delete(p, true);

    Configuration conf = new Configuration();

    Job job = new Job(conf, "wordcount");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setJarByClass(WordCount.class);
    job.setInputFormatClass(TextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
}

}

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You have a 'bug' in your reducer - the value iterator re-uses the same IntWritable throughout the loop, so you should wrap the value being added to the list as follows:

public void reduce(Text key, Iterable<IntWritable> values, Context context)
                                      throws IOException, InterruptedException {
    ArrayList<IntWritable> list = new ArrayList<IntWritable>();    
    for (IntWritable val : values) {
        list.add(new IntWritable(val));
    }
    context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
}

This isn't actually a problem as you're using an array list and your mapper only outputs a single value (one) but is something that may trip you up if you ever extend this code.

You also need to define in your job that your map and reducer output types are different:

// map output types
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// reducer output types

job.setOutputValueClass(Text.class);
job.setOutputValueClass(MyArrayWritable.class);

You might want to explicitly define the number of reducers (which may be why you never see your sysouts being written to the task logs, especially if your cluster admin has defined the default number to be 0):

job.setNumReduceTasks(1);

Your using the default Text output format, which calls toString() on the output key and value pairs - MyArrayWritable doesn't have an overridden toString method so you should put one in your MyArrayWritable:

@Override
public String toString() {
  return Arrays.toString(get());
}

Finally remove the overridden write method from MyArrayWritable - this is not a valid implementation compatible with the complimentary readFields method. you don't need to override this method but if you do (say you want to see a sysout to verify it's being called) then do something like this instead:

@Override
public void write(DataOutput arg0) throws IOException {
  System.out.println("write method called");
  super.write(arg0);
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...