Wednesday, May 28, 2014

Hadoop Mapreduce : Working with Counters to get the outlook of data

The Hadoop system records a set of metric counters for each job that it runs. For example, the number of input records mapped, the number of bytes it reads from or writes to HDFS, etc. To profile your applications, you may wish to record other values as well. For example, if the records sent into your mappers fall into two categories (call them "A" and "B"), you may wish to count the total number of A-records seen vs. the total number of B-records.

lets run a small analytic on the following dataset to figure out how many records are of age less than 15 and how many of them are of greater than the 60 year of age.

"","Country","Region","Population","AgeGrp","FertilityRate","LifeExpectancy","ChildMortality","CellularSubscribers","LiteracyRate","GNI","PrimarySchoolEnrollmentMale","PrimarySchoolEnrollmentFemale"
"2","Albania","Europe",3162,21,1.75,74,16.7,96.39,NA,8820,NA,NA
"4","Andorra","Europe",78,15,NA,82,3.2,75.49,NA,NA,78.4,79.4
"8","Armenia","Europe",2969,62,1.74,71,16.4,103.57,99.6,6100,NA,NA
"10","Austria","Europe",8464,71,1.44,81,4,154.78,NA,42050,NA,NA
"11","Azerbaijan","Europe",9309,12,1.96,71,35.2,108.75,NA,8960,85.3,84.1
"16","Belarus","Europe",9405,9,1.47,71,5.2,111.88,NA,14460,NA,NA
"17","Belgium","Europe",11060,18,1.85,80,4.2,116.61,NA,39190,98.9,99.2
"22","Bosnia and Herzegovina","Europe",3834,71,1.26,76,6.7,84.52,97.9,9190,86.5,88.4
"26","Bulgaria","Europe",7278,9,1.51,74,12.1,140.68,NA,14160,99.3,99.7

lets configure our Mapper class with the Counter group "Under_15" and "Over_60" and extract the AgeGrp if we found the record which is greater than 60 increment the counter with count 1 and if the record is less than 15 then increment the counter with count 1.

package com.rajkrrsingh.counter;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CounterExMapper extends Mapper<Object, Text, NullWritable, NullWritable>{

public static final String UNDER_FIFTEEN_GROUP = "Under_15";
public static final String OVER_SIXTY_GROUP = "Over_60";

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {

String[] tokens = value.toString().split(",");

if(tokens[4] !=null && Integer.parseInt(tokens[4])>60) {
context.getCounter(OVER_SIXTY_GROUP, "Over_60").increment(1);
}else if (tokens[4] !=null && Integer.parseInt(tokens[4])<15) {
context.getCounter(UNDER_FIFTEEN_GROUP, "Under_15").increment(1);
}else {
context.getCounter(UNDER_FIFTEEN_GROUP, "NA").increment(1);
}
}

}

here is out job driver to get the overview of records which are greater than 60 and less than 15.

package com.rajkrrsingh.counter;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.Counter;

public class CounterDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();

if(commandLineArgs.length != 2) {
System.err.println("Usage : CounterDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf,"Count the record with age <15 and >60");
job.setJarByClass(CounterDriver.class);
job.setMapperClass(CounterExMapper.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));

int rc = job.waitForCompletion(true) ?0:1;

if(rc==0) {
for (Counter counter : job.getCounters().getGroup(CounterExMapper.OVER_SIXTY_GROUP)) {
System.out.println(counter.getDisplayName() + "\t"+ counter.getValue());
}

for (Counter counter : job.getCounters().getGroup(CounterExMapper.UNDER_FIFTEEN_GROUP)) {
System.out.println(counter.getDisplayName() + "\t"+ counter.getValue());
}
}

FileSystem.get(conf).delete(new Path(commandLineArgs[1]), true);

System.exit(rc);

}

}

Tuesday, May 27, 2014

Text analytics : binning the large data sets using MapReduce ( MultipleOutputs)

Divide and conquer is best way to overcome a big problem same is also applied to the large dataset.suppose we have a large data sets and we want to run the analytics overs some keywords which are scattered in the dataset then its better to partition the data in some bins based on the keywords and then run analytics over individual data set.
To provide binning of dataset hadoop mapreduce api shipped with the MulitpleOutputs, here you can find the general documentation to get familiar with the :
The MultipleOutputs class simplifies writing output data to multiple outputs

Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own OutputFormat, with its own key class and with its own value class.

Case two: to write data to different files provided by user

MultipleOutputs supports counters, by default they are disabled. The counters group is the MultipleOutputs class name. The names of the counters are the same as the output name. These count the number records written to each output name.

let suppose we have some keywords keyword1,keyword2,keyword3 in the provided data set and we want to create three bins based on the three keyword provided here.
lets configure our mapper which take each line as input and and look the match for the provided keyword,if it find the match in the line then mapper emits the line as a value using mulipleOutput object created while setup of mapper gets called.

package com.rajkrrsingh.binning;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class BinMapper extends Mapper<Object, Text, Text, NullWritable>{

private MultipleOutputs<Text, NullWritable> mos = null;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, NullWritable>(context);
}

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
if(line.contains("keyword1")) {
mos.write("bins", value, NullWritable.get(), "keyword1");
}else if(line.contains("keyword2")) {
mos.write("bins", value,NullWritable.get(),"keyword2");
}else if(line.contains("keyword3")) {
mos.write("bins", value, NullWritable.get(), "keyword3");
}else {
mos.write("bins", value, NullWritable.get(), "no_match");
}
}

@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
mos.close();
}
}

Now configure your Driver class to setup your binning job
package com.rajkrrsingh.binning;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class BinningJobDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 2) {
System.err.println("Usage : BinningJobDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf, "Binning Job");
job.setJarByClass(BinningJobDriver.class);
job.setMapperClass(BinMapper.class);
job.setNumReduceTasks(0);

TextInputFormat.setInputPaths(job, new Path(commandLineArgs[0]));
TextOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));

MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,Text.class, NullWritable.class);
MultipleOutputs.setCountersEnabled(job, true);
int rc = job.waitForCompletion(true) ? 0 : 2;
System.exit(rc);

}

}



MapReduce : Get Distinct Records from the Big data set.

in this tutorial we will do some analytics of data using map-reduce, we will find out the distinct records from the data set using map reduce.
lets walk trough our sample data which look like as follows, its a sample data of a telecom company which has subscriber id along with the tower id,we will run a map reduce on the dataset to find out the distinct records :
subId=00001111911128052627,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.4621702216543667E17
subId=00001111911128052639,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212219.6726312167218586E17
subId=00001111911128052615,towerid=11232w34532543456345623453456984756894758,bytes=122112212212212216.9431647633139046E17
subId=00001111911128052615,towerid=11232w34532543456345623453456984756894757,bytes=122112212212212214.7836041833447418E17
subId=00001111911128052639,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212219.0366596827240525E17
subId=00001111911128052619,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.0686280014540467E17
subId=00001111911128052658,towerid=11232w34532543456345623453456984756894759,bytes=122112212212212216.9860890496178944E17
subId=00001111911128052652,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.303981333116041E17

Here is my mapper code which takes line of data as input and extract and emit the towerid along with the NullWritable.

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistinctMapper extends Mapper<Object, Text, Text, NullWritable> {

private Text outTowerId = new Text();

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {

String[] tokens = value.toString().split(",");
String[] towerIdEx = tokens[1].split("=");
String towerId = towerIdEx[1].toString();
if(towerId != null) {
outTowerId.set(towerId);
}
context.write(outTowerId, NullWritable.get());

}

}

The reducer code is plain identity reducer that writes the key along with the NullWritable values

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable>{

@Override
protected void reduce(Text key, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}

}

Now configure your driver class as follows:

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DistinctDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 2) {
System.err.println("Usage : DistinctDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf, "Distinct Towers");
job.setJarByClass(DistinctDriver.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));
int rc = job.waitForCompletion(true)?0:1;
System.exit(rc);
}

}

Distributed Grep : Implementation of distributed grep on the big data set using MapReduce

Grep is a command-line utility for searching plain-text data sets for lines matching a regular expression. Grep was originally developed for the Unix operating system, but is available today for all Unix-like systems. Its name comes from the ed command g/re/p (globally search a regular expression and print), which has the same effect: doing a global search with the regular expression and printing all matching lines

In this tutorial I will demonstarte you to implement distributed grep utility which will work on the files stored on the HDFS using MapReduce.It is a very simple program that receives as an input a regular expression, scans a bunch of input files, find any matching strings while counting the occurrences of each match and finally outputs the result into an output file.

lets create our Mapper as follows

package com.rajkrrsingh.disributedgrep;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistributedGrepMapper extends Mapper<Object, Text, NullWritable, Text>{

@Override
protected void map(Object key, Text value,Context context) throws IOException, InterruptedException {
String txt = value.toString();
String regex = context.getConfiguration().get("regex");

if (txt.matches(regex)) {
context.write(NullWritable.get(), value);
}
}

}

Now we need to implement the driver class which will run the map reduce on the cluster and produce the output.
package com.rajkrrsingh.disributedgrep;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DistributedGrepDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 3) {
System.err.println("Usage : DistributedGrepDriver <regex> <input> <output>");
System.exit(2);
}

conf.set("regex", commandLineArgs[0]);
Job job = new Job(conf,"Distributed Grep Using MapReduce");
job.setJarByClass(DistributedGrepDriver.class);
job.setMapperClass(DistributedGrepMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[2]));
int rc = job.waitForCompletion(true) ? 0 : 1;
System.exit(rc);
}


}