Thursday, December 17, 2015

Hadoop ChainReducer example

Below is example of how to use ChainReducer, you can override data at each stage.

(It is important to note that you use ChainReducer.setReducer only once for a job )

To run jar (pack things in jar it is easy) use:
hadoop jar chainingreducer.jar  /user/hduser/wordcount/input /user/hduser/wordcount/output


Below is the flow:
TokenizerMapper (data read from file) --> TokenizerMapper2 (data modified) --> IntSumReducer (Reduced on key and sum calculated, word count) -> TokenizerMapper3 (again data modified)


ChainingReducerClass.java

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ChainingReducerClass {

public static class TokenizerMapper extends
Mapper<Object, Text, TextPair, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private static TextPair textPair = new TextPair();

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
Text word2 = new Text();
word2.set("mapper1");
Text word3 = new Text();
word3.set("ggg");
textPair.set(word, word2, word3);
context.write(textPair, one);
}
}
}

public static class TokenizerMapper2 extends
Mapper<TextPair, IntWritable, TextPair, IntWritable> {

private static TextPair textPair = new TextPair();

public void map(TextPair key, IntWritable value, Context context)
throws IOException, InterruptedException {

Text text3 = new Text();
text3.set("fff");
textPair.set(key.getFirst(), key.getSecond(), text3);
context.write(textPair, value);

}

}

public static class IntSumReducer extends
Reducer<TextPair, IntWritable, TextPair, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(TextPair key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static class TokenizerMapper3 extends
Mapper<TextPair, IntWritable, TextPair, IntWritable> {

private static TextPair textPair = new TextPair();

public void map(TextPair key, IntWritable value, Context context)
throws IOException, InterruptedException {

Text text3 = new Text();
text3.set("333");
textPair.set(key.getFirst(), key.getSecond(), text3);
context.write(textPair, value);

}

}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");

// //////////////Here add chaining code for mappers
Configuration mapAConf = new Configuration(false);
ChainMapper.addMapper(job, TokenizerMapper.class, Object.class,
Text.class, TextPair.class, IntWritable.class, mapAConf);

Configuration mapBConf = new Configuration(false);
ChainMapper.addMapper(job, TokenizerMapper2.class, TextPair.class,
IntWritable.class, TextPair.class, IntWritable.class, mapBConf);
// //////////////end of chaining for mappers
job.setJarByClass(ChainingReducerClass.class);

job.setCombinerClass(IntSumReducer.class);

// there can only be one setreducer
ChainReducer.setReducer(job, IntSumReducer.class, TextPair.class,
IntWritable.class, TextPair.class, IntWritable.class,
new Configuration(false));
// reducers results go to the mapper
ChainReducer.addMapper(job, TokenizerMapper3.class, TextPair.class,
IntWritable.class, TextPair.class, IntWritable.class, null);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

TextPair.java

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable {

private Text first;
private Text second;
private Text third;

public TextPair(Text first, Text second, Text third) {
set(first, second, third);
}

public TextPair() {
set(new Text(), new Text(), new Text());
}

public TextPair(String first, String second, String third) {
set(new Text(first), new Text(second), new Text(third));
}

public Text getFirst() {
return first;
}

public Text getSecond() {
return second;
}

public Text getThird() {
return third;
}

public void set(Text first, Text second, Text third) {
this.first = first;
this.second = second;
this.third = third;
}

@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
third.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
third.write(out);
}

@Override
public String toString() {
return first + " " + second + " " + third;
}

@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode() + third.hashCode();
}

@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second)
&& third.equals(tp.third);
}
return false;
}

@Override
public int compareTo(Object tp) {

if (tp instanceof TextPair) {

int cmp = first.compareTo(((TextPair) tp).first);

if (cmp != 0) {
return cmp;
}

int cmp1 = second.compareTo(((TextPair) tp).second);

if (cmp1 != 0) {
return cmp1;
}
return third.compareTo(((TextPair) tp).third);

}
return -1;

}

}


No comments:

Post a Comment