in this tutorial we will do some analytics of data using map-reduce, we will find out the distinct records from the data set using map reduce.
lets walk trough our sample data which look like as follows, its a sample data of a telecom company which has subscriber id along with the tower id,we will run a map reduce on the dataset to find out the distinct records :
Here is my mapper code which takes line of data as input and extract and emit the towerid along with the NullWritable.
The reducer code is plain identity reducer that writes the key along with the NullWritable values
Now configure your driver class as follows:
lets walk trough our sample data which look like as follows, its a sample data of a telecom company which has subscriber id along with the tower id,we will run a map reduce on the dataset to find out the distinct records :
subId=00001111911128052627,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.4621702216543667E17 subId=00001111911128052639,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212219.6726312167218586E17 subId=00001111911128052615,towerid=11232w34532543456345623453456984756894758,bytes=122112212212212216.9431647633139046E17 subId=00001111911128052615,towerid=11232w34532543456345623453456984756894757,bytes=122112212212212214.7836041833447418E17 subId=00001111911128052639,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212219.0366596827240525E17 subId=00001111911128052619,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.0686280014540467E17 subId=00001111911128052658,towerid=11232w34532543456345623453456984756894759,bytes=122112212212212216.9860890496178944E17 subId=00001111911128052652,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.303981333116041E17
Here is my mapper code which takes line of data as input and extract and emit the towerid along with the NullWritable.
package com.rajkrrsingh.distinct;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DistinctMapper extends Mapper<Object, Text, Text, NullWritable> {
private Text outTowerId = new Text();
@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
String[] towerIdEx = tokens[1].split("=");
String towerId = towerIdEx[1].toString();
if(towerId != null) {
outTowerId.set(towerId);
}
context.write(outTowerId, NullWritable.get());
}
}
The reducer code is plain identity reducer that writes the key along with the NullWritable values
package com.rajkrrsingh.distinct;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
@Override
protected void reduce(Text key, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
Now configure your driver class as follows:
package com.rajkrrsingh.distinct;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class DistinctDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 2) {
System.err.println("Usage : DistinctDriver <input> <output>");
System.exit(2);
}
Job job = new Job(conf, "Distinct Towers");
job.setJarByClass(DistinctDriver.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));
int rc = job.waitForCompletion(true)?0:1;
System.exit(rc);
}
}