Wednesday, December 16, 2015

Hadoop ChainMapper Example

Below is example of ChainMapper where input read from file, goes to another mapper and then to reducer with more complex output. It shows you can also override data at stages.


ChainMapperClass.java

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ChainMapperClass {

public static class TokenizerMapper extends
Mapper<Object, Text, TextPair, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private static TextPair textPair = new TextPair();

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
Text word2 = new Text();
word2.set("mapper1");
Text word3 = new Text();
word3.set("ggg");
textPair.set(word, word2, word3);
context.write(textPair, one);
}
}
}

public static class TokenizerMapper2 extends
Mapper<TextPair, IntWritable, TextPair, IntWritable> {

private static TextPair textPair = new TextPair();

public void map(TextPair key, IntWritable value, Context context)
throws IOException, InterruptedException {

Text text3 = new Text();
text3.set("fff");
textPair.set(key.getFirst(), key.getSecond(), text3);
context.write(textPair, value);

}

}

public static class IntSumReducer extends
Reducer<TextPair, IntWritable, TextPair, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(TextPair key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");

// //////////////Here add chaining code for mappers
Configuration mapAConf = new Configuration(false);
ChainMapper.addMapper(job, TokenizerMapper.class, Object.class,
Text.class, TextPair.class, IntWritable.class, mapAConf);

Configuration mapBConf = new Configuration(false);
ChainMapper.addMapper(job, TokenizerMapper2.class, TextPair.class,
IntWritable.class, TextPair.class, IntWritable.class, mapBConf);
// //////////////end of chaining for mappers
job.setJarByClass(ChainMapperClass.class);

// //job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

// //job.setMapOutputKeyClass(TextPair.class);
// //job.setMapOutputValueClass(IntWritable.class);

// job.setOutputKeyClass(TextPair.class);
// job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}


TextPair.java

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable {

private Text first;
private Text second;
private Text third;

public TextPair(Text first, Text second, Text third) {
set(first, second, third);
}

public TextPair() {
set(new Text(), new Text(), new Text());
}

public TextPair(String first, String second, String third) {
set(new Text(first), new Text(second), new Text(third));
}

public Text getFirst() {
return first;
}

public Text getSecond() {
return second;
}

public Text getThird() {
return third;
}

public void set(Text first, Text second, Text third) {
this.first = first;
this.second = second;
this.third = third;
}

@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
third.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
third.write(out);
}

@Override
public String toString() {
return first + " " + second + " " + third;
}

@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode() + third.hashCode();
}

@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second)
&& third.equals(tp.third);
}
return false;
}

@Override
public int compareTo(Object tp) {

if (tp instanceof TextPair) {

int cmp = first.compareTo(((TextPair) tp).first);

if (cmp != 0) {
return cmp;
}

int cmp1 = second.compareTo(((TextPair) tp).second);

if (cmp1 != 0) {
return cmp1;
}
return third.compareTo(((TextPair) tp).third);

}
return -1;

}

}

No comments:

Post a Comment