Monday, October 28, 2013

Hadoop : Joining two datasets using Map Side Join

It’s inevitable that you’ll come across data analyses where you need to pull in data from
different sources. For example, given our custer order data sets, you may want to find out
if certain order are placed from the customers with their detailed information such as address etc. You’ll have to look at customer
data (cust_data.txt) as well as cust_order data (cust_order.txt). In the database world it would just be a matter of joining two tables, and most databases automatically take care of the join processing for you. Unfortunately, joiningdata in Hadoop is more involved, and there are several possible approaches withdifferent trade-offs.
you can download the complete code from my github
In this example I will demonstrate you to use Map Side join using distributed cache.

Input Data : order_custid.txt
..............................................................................

781571544 S9876126
781571545 S9876127
781571546 S9876128
781571547 S9876129
781571548 S9876130
781571549 S9876131
781571550 S9876132
781571551 S9876133
781571552 S9876134
our customer dataset
Customer Data : cust_data.txt
.............................................................................

781571544 Smith,John      (248)-555-9430  jsmith@aol.com
781571545 Hunter,April    (815)-555-3029  april@showers.org
781571546 Ching,Iris      (305)-555-0919  iching@zen.org
781571547 Doe,John        (212)-555-0912  jdoe@morgue.com
781571548 Jones,Tom       (312)-555-3321  tj2342@aol.com
781571549 Smith,John      (607)-555-0023  smith@pocahontas.com
781571550 Crosby,Dave     (405)-555-1516  cros@csny.org
781571551 Johns,Pam       (313)-555-6790  pj@sleepy.com
781571552 Jetter,Linda    (810)-555-8761  netless@earthlink.net

We will create a our mapper class in the setup method we will parse through the order_custid file available in distributed cache and keep custid and order no in the hashmap.

package com.rajkrrsingh.hadoop.mapjoin;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
 
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
 
        private static HashMap<String, String> CustIdOrderMap = new HashMap<String, String>();
        private BufferedReader brReader;
        private String orderNO = "";
        private Text outKey = new Text("");
        private Text outValue = new Text("");
 
        enum MYCOUNTER {
                RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
        }
 
        @Override
        protected void setup(Context context) throws IOException,
                        InterruptedException {
 
                Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context
                                .getConfiguration());
 
                for (Path eachPath : cacheFilesLocal) {
                        if (eachPath.getName().toString().trim().equals("order_custid.txt")) {
                                context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
                                setupOrderHashMap(eachPath, context);
                        }
                }
 
        }
 
        private void setupOrderHashMap(Path filePath, Context context)
                        throws IOException {
 
                String strLineRead = "";
 
                try {
                        brReader = new BufferedReader(new FileReader(filePath.toString()));
 
                        while ((strLineRead = brReader.readLine()) != null) {
                                String custIdOrderArr[] = strLineRead.toString().split("\\s+");
                                CustIdOrderMap.put(custIdOrderArr[0].trim(),        custIdOrderArr[1].trim());
                        }
                } catch (FileNotFoundException e) {
                        e.printStackTrace();
                        context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
                } catch (IOException e) {
                        context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
                        e.printStackTrace();
                }finally {
                        if (brReader != null) {
                                brReader.close();
 
                        }
 
                }
        }
 
        @Override
        public void map(LongWritable key, Text value, Context context)
                        throws IOException, InterruptedException {
 
                context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
 
                if (value.toString().length() > 0) {
                        String custDataArr[] = value.toString().split("\\s+");
 
                        try {
                                orderNO = CustIdOrderMap.get(custDataArr[0].toString());
                        } finally {
                                orderNO = ((orderNO.equals(null) || orderNO
                                                .equals("")) ? "NOT-FOUND" : orderNO);
                        }
 
                        outKey.set(custDataArr[0].toString());
 
                        outValue.set(custDataArr[1].toString() + "\t"
                                        + custDataArr[2].toString() + "\t"
                                        + custDataArr[3].toString() + "\t" + orderNO);
 
                }
                context.write(outKey, outValue);
                orderNO = "";
        }
}

Setup Driver class as follows:
package com.rajkrrsingh.hadoop.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class App extends Configured implements Tool
{
    public static void main( String[] args ) throws Exception
    {
            int exitCode = ToolRunner.run(new Configuration(),new App(), args);
            System.exit(exitCode);
        }
    @Override
    public int run(String[] args) throws Exception {
            if(args.length !=2 ){
                    System.err.println("Usage : App -files <location-to-cust-id-and-order-file> <input path> <output path>");
                    System.exit(-1);
            }
            Job job = new Job(getConf());
            job.setJobName("Map Side Join");
            job.setJarByClass(App.class);
            FileInputFormat.addInputPath(job,new Path(args[0]) );
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.setMapperClass(MapSideJoinMapper.class);
            job.setNumReduceTasks(0);
            
            boolean success = job.waitForCompletion(true);
                return success ? 0 : 1;
            
    }
}
here we have set 0 reducer so that the output of the map can directly written to the output file.
Now create a jar and run the following command on your Hadoop cluster
$bin/hadoop jar /home/rks/example/HadoopMapSideJoin/target/HadoopMapSideJoin.jar com.rajkrrsingh.hadoop.mapjoin -files /home/rks/Downloads/order_custid.txt input output

that what we need and here is the result.
output::
.................................................................................
781571544 Smith,John (248)-555-9430 jsmith@aol.com S9876126
781571545 Hunter,April (815)-555-3029 april@showers.org S9876127
781571546 Ching,Iris (305)-555-0919 iching@zen.org S9876128
781571547 Doe,John (212)-555-0912 jdoe@morgue.com S9876129
781571548 Jones,Tom (312)-555-3321 tj2342@aol.com S9876130
781571549 Smith,John (607)-555-0023 smith@pocahontas.com S9876131
781571550 Crosby,Dave (405)-555-1516 cros@csny.org S9876132
781571551 Johns,Pam (313)-555-6790 pj@sleepy.com S9876133
781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134
781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134
781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134