Friday, December 18, 2015

Install and run pig 0.15.0 on hadoop 2.6.2 for Linux or Ubuntu

Below are instructions to download install and Run PIG 0.15.0

DownLoad tar.gz

Move it to a new location: on my machine /usr/local/pig-0.15.0
      sudo mv pig-0.15.0 /usr/local/

Now setup env vars, below are variables on my machine (ubuntu 14+ LTS) (file .bashrc):
     #PIG VARS
     export PIG_HOME='/usr/local/pig-0.15.0'
     export PATH=$PIG_HOME/bin:$PATH
     export PIG_CLASSPATH=$HADOOP_INSTALL/etc/hadoop
     #PIG VARS END

Thursday, December 17, 2015

Hadoop ChainReducer example

Below is example of how to use ChainReducer, you can override data at each stage.

(It is important to note that you use ChainReducer.setReducer only once for a job )

To run jar (pack things in jar it is easy) use:
hadoop jar chainingreducer.jar  /user/hduser/wordcount/input /user/hduser/wordcount/output


Below is the flow:
TokenizerMapper (data read from file) --> TokenizerMapper2 (data modified) --> IntSumReducer (Reduced on key and sum calculated, word count) -> TokenizerMapper3 (again data modified)


ChainingReducerClass.java

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ChainingReducerClass {

public static class TokenizerMapper extends
Mapper<Object, Text, TextPair, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private static TextPair textPair = new TextPair();

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
Text word2 = new Text();
word2.set("mapper1");
Text word3 = new Text();
word3.set("ggg");
textPair.set(word, word2, word3);
context.write(textPair, one);
}
}
}

public static class TokenizerMapper2 extends
Mapper<TextPair, IntWritable, TextPair, IntWritable> {

private static TextPair textPair = new TextPair();

public void map(TextPair key, IntWritable value, Context context)
throws IOException, InterruptedException {

Text text3 = new Text();
text3.set("fff");
textPair.set(key.getFirst(), key.getSecond(), text3);
context.write(textPair, value);

}

}

public static class IntSumReducer extends
Reducer<TextPair, IntWritable, TextPair, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(TextPair key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static class TokenizerMapper3 extends
Mapper<TextPair, IntWritable, TextPair, IntWritable> {

private static TextPair textPair = new TextPair();

public void map(TextPair key, IntWritable value, Context context)
throws IOException, InterruptedException {

Text text3 = new Text();
text3.set("333");
textPair.set(key.getFirst(), key.getSecond(), text3);
context.write(textPair, value);

}

}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");

// //////////////Here add chaining code for mappers
Configuration mapAConf = new Configuration(false);
ChainMapper.addMapper(job, TokenizerMapper.class, Object.class,
Text.class, TextPair.class, IntWritable.class, mapAConf);

Configuration mapBConf = new Configuration(false);
ChainMapper.addMapper(job, TokenizerMapper2.class, TextPair.class,
IntWritable.class, TextPair.class, IntWritable.class, mapBConf);
// //////////////end of chaining for mappers
job.setJarByClass(ChainingReducerClass.class);

job.setCombinerClass(IntSumReducer.class);

// there can only be one setreducer
ChainReducer.setReducer(job, IntSumReducer.class, TextPair.class,
IntWritable.class, TextPair.class, IntWritable.class,
new Configuration(false));
// reducers results go to the mapper
ChainReducer.addMapper(job, TokenizerMapper3.class, TextPair.class,
IntWritable.class, TextPair.class, IntWritable.class, null);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

Few useful Hadoop 2.6.2 commands

Below are few useful Hadoop (2.6.2) commands:

stop-yarn.sh

stop-dfs.sh

hadoop namenode -format

start-dfs.sh

#will list processes running in hadoop
jps

hdfs dfs -mkdir /user

hdfs dfs -mkdir /user/hduser

hdfs dfs -mkdir /user/hduser/wordcount

hdfs dfs -mkdir /user/hduser/wordcount/input

cd /usr/local/hadoop/temp_hduser

#copy file from local to hdfs
hdfs dfs -put input1.txt /user/hduser/wordcount/input/file0
hdfs dfs -put input1.txt /user/hduser/wordcount/input/file1

start-yarn.sh

#For removing dirs if you have
hdfs  dfs -rmr /user/hduser/wordcount/output

#Run your jar will read everything from the input dir, below will not work if Yarn is not started :)
hadoop jar chainingmapper.jar  /user/hduser/wordcount/input /user/hduser/wordcount/output

Wednesday, December 16, 2015

Hadoop ChainMapper Example

Below is example of ChainMapper where input read from file, goes to another mapper and then to reducer with more complex output. It shows you can also override data at stages.


ChainMapperClass.java

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ChainMapperClass {

public static class TokenizerMapper extends
Mapper<Object, Text, TextPair, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private static TextPair textPair = new TextPair();

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
Text word2 = new Text();
word2.set("mapper1");
Text word3 = new Text();
word3.set("ggg");
textPair.set(word, word2, word3);
context.write(textPair, one);
}
}
}

public static class TokenizerMapper2 extends
Mapper<TextPair, IntWritable, TextPair, IntWritable> {

private static TextPair textPair = new TextPair();

public void map(TextPair key, IntWritable value, Context context)
throws IOException, InterruptedException {

Text text3 = new Text();
text3.set("fff");
textPair.set(key.getFirst(), key.getSecond(), text3);
context.write(textPair, value);

}

}

public static class IntSumReducer extends
Reducer<TextPair, IntWritable, TextPair, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(TextPair key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");

// //////////////Here add chaining code for mappers
Configuration mapAConf = new Configuration(false);
ChainMapper.addMapper(job, TokenizerMapper.class, Object.class,
Text.class, TextPair.class, IntWritable.class, mapAConf);

Configuration mapBConf = new Configuration(false);
ChainMapper.addMapper(job, TokenizerMapper2.class, TextPair.class,
IntWritable.class, TextPair.class, IntWritable.class, mapBConf);
// //////////////end of chaining for mappers
job.setJarByClass(ChainMapperClass.class);

// //job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

// //job.setMapOutputKeyClass(TextPair.class);
// //job.setMapOutputValueClass(IntWritable.class);

// job.setOutputKeyClass(TextPair.class);
// job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}


How to Install hadoop 2.6.2 on Ubuntu or Linux

If you are looking to install and run hadoop 2.6.2 (pseudo distributed )follow following instructions:

DownLoad Hadoop 2.6.2

Change your eclipse theme

Creating user and Adding user to group
$ sudo addgroup hadoop $ sudo adduser --ingroup hadoop hduser

If you dont have ssh follow:

$ sudo apt-get install ssh
$ sudo apt-get install rsync

My .bashrc (change it as per your need)
#HADOOP VARIABLES START
export JAVA_HOME=/usr/lib/jvm/java-8-oracle
export PATH=$PATH:/usr/lib/jvm/java-8-oracle/bin
export HADOOP_INSTALL=/usr/local/hadoop
export PATH=$PATH:$HADOOP_INSTALL/bin
export PATH=$PATH:$HADOOP_INSTALL/sbin
export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_HOME=$HADOOP_INSTALL
export HADOOP_HDFS_HOME=$HADOOP_INSTALL
export YARN_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib/native"
export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar
#HADOOP VARIABLES END

Configuration

Use the following:
etc/hadoop/core-site.xml:
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>
etc/hadoop/hdfs-site.xml:
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

Setup passphraseless ssh

Now check that you can ssh to the localhost without a passphrase:
  $ ssh localhost
If you cannot ssh to localhost without a passphrase, execute the following commands:
  $ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
  $ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys


For running hadoop:

The following instructions are to run a MapReduce job locally. If you want to execute a job on YARN, see YARN on Single Node.
  1. Format the filesystem:
      $ bin/hdfs namenode -format
  2. Start NameNode daemon and DataNode daemon:
      $ sbin/start-dfs.sh
    The hadoop daemon log output is written to the $HADOOP_LOG_DIR directory (defaults to $HADOOP_HOME/logs).
  3. Browse the web interface for the NameNode; by default it is available at:
    • NameNode - http://localhost:50070/
  4. Make the HDFS directories required to execute MapReduce jobs:
      $ bin/hdfs dfs -mkdir /user
      $ bin/hdfs dfs -mkdir /user/<username>
  5. Copy the input files into the distributed filesystem:
      $ bin/hdfs dfs -put etc/hadoop input
Now setup and start Yarn to submit a job:
You can run a MapReduce job on YARN in a pseudo-distributed mode by setting a few parameters and running ResourceManager daemon and NodeManager daemon in addition.
The following instructions assume that 1. ~ 4. steps of the above instructions are already executed.
  1. Configure parameters as follows:
    etc/hadoop/mapred-site.xml:
    <configuration>
        <property>
            <name>mapreduce.framework.name</name>
            <value>yarn</value>
        </property>
    </configuration>
    etc/hadoop/yarn-site.xml:
    <configuration>
        <property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle</value>
        </property>
    </configuration>
  2. Start ResourceManager daemon and NodeManager daemon:
      $ sbin/start-yarn.sh
  3. Browse the web interface for the ResourceManager; by default it is available at:
    • ResourceManager - http://localhost:8088/
  4. Run a MapReduce job.
  5. When you're done, stop the daemons with:
      $ sbin/stop-yarn.sh
Now submit a job:
  1. $ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.2.jar grep input output 'dfs[a-z.]+'
  2. Examine the output files:
    Copy the output files from the distributed filesystem to the local filesystem and examine them:
      $ bin/hdfs dfs -get output output
      $ cat output/*
    or
    View the output files on the distributed filesystem:
      $ bin/hdfs dfs -cat output/*
  3. When you're done, stop the daemons with:
      $ sbin/stop-dfs.sh
Try JPS command to see what all are running, it should show namenode, datanode, secondary namenode.
If namenode not running then u need to format again (search for format in this blog).
If datanode not running you will need to delete files in the tmp dirs or could be permission issues. To see errors try "hadoop datanode" command.