Wednesday, May 28, 2014

Hadoop Mapreduce : Working with Counters to get the outlook of data

The Hadoop system records a set of metric counters for each job that it runs. For example, the number of input records mapped, the number of bytes it reads from or writes to HDFS, etc. To profile your applications, you may wish to record other values as well. For example, if the records sent into your mappers fall into two categories (call them "A" and "B"), you may wish to count the total number of A-records seen vs. the total number of B-records.

lets run a small analytic on the following dataset to figure out how many records are of age less than 15 and how many of them are of greater than the 60 year of age.

"","Country","Region","Population","AgeGrp","FertilityRate","LifeExpectancy","ChildMortality","CellularSubscribers","LiteracyRate","GNI","PrimarySchoolEnrollmentMale","PrimarySchoolEnrollmentFemale"
"2","Albania","Europe",3162,21,1.75,74,16.7,96.39,NA,8820,NA,NA
"4","Andorra","Europe",78,15,NA,82,3.2,75.49,NA,NA,78.4,79.4
"8","Armenia","Europe",2969,62,1.74,71,16.4,103.57,99.6,6100,NA,NA
"10","Austria","Europe",8464,71,1.44,81,4,154.78,NA,42050,NA,NA
"11","Azerbaijan","Europe",9309,12,1.96,71,35.2,108.75,NA,8960,85.3,84.1
"16","Belarus","Europe",9405,9,1.47,71,5.2,111.88,NA,14460,NA,NA
"17","Belgium","Europe",11060,18,1.85,80,4.2,116.61,NA,39190,98.9,99.2
"22","Bosnia and Herzegovina","Europe",3834,71,1.26,76,6.7,84.52,97.9,9190,86.5,88.4
"26","Bulgaria","Europe",7278,9,1.51,74,12.1,140.68,NA,14160,99.3,99.7

lets configure our Mapper class with the Counter group "Under_15" and "Over_60" and extract the AgeGrp if we found the record which is greater than 60 increment the counter with count 1 and if the record is less than 15 then increment the counter with count 1.

package com.rajkrrsingh.counter;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CounterExMapper extends Mapper<Object, Text, NullWritable, NullWritable>{

public static final String UNDER_FIFTEEN_GROUP = "Under_15";
public static final String OVER_SIXTY_GROUP = "Over_60";

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {

String[] tokens = value.toString().split(",");

if(tokens[4] !=null && Integer.parseInt(tokens[4])>60) {
context.getCounter(OVER_SIXTY_GROUP, "Over_60").increment(1);
}else if (tokens[4] !=null && Integer.parseInt(tokens[4])<15) {
context.getCounter(UNDER_FIFTEEN_GROUP, "Under_15").increment(1);
}else {
context.getCounter(UNDER_FIFTEEN_GROUP, "NA").increment(1);
}
}

}

here is out job driver to get the overview of records which are greater than 60 and less than 15.

package com.rajkrrsingh.counter;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.Counter;

public class CounterDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();

if(commandLineArgs.length != 2) {
System.err.println("Usage : CounterDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf,"Count the record with age <15 and >60");
job.setJarByClass(CounterDriver.class);
job.setMapperClass(CounterExMapper.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));

int rc = job.waitForCompletion(true) ?0:1;

if(rc==0) {
for (Counter counter : job.getCounters().getGroup(CounterExMapper.OVER_SIXTY_GROUP)) {
System.out.println(counter.getDisplayName() + "\t"+ counter.getValue());
}

for (Counter counter : job.getCounters().getGroup(CounterExMapper.UNDER_FIFTEEN_GROUP)) {
System.out.println(counter.getDisplayName() + "\t"+ counter.getValue());
}
}

FileSystem.get(conf).delete(new Path(commandLineArgs[1]), true);

System.exit(rc);

}

}