It’s inevitable that you’ll come across data analyses where you need to pull in data from
different sources. For example, given our custer order data sets, you may want to find out
if certain order are placed from the customers with their detailed information such as address etc. You’ll have to look at customer
data (cust_data.txt) as well as cust_order data (cust_order.txt). In the database world it would just be a matter of joining two tables, and most databases automatically take care of the join processing for you. Unfortunately, joiningdata in Hadoop is more involved, and there are several possible approaches withdifferent trade-offs.
you can download the complete code from my github
In this example I will demonstrate you to use Map Side join using distributed cache.
We will create a our mapper class in the setup method we will parse through the order_custid file available in distributed cache and keep custid and order no in the hashmap.
Setup Driver class as follows:
Now create a jar and run the following command on your Hadoop cluster
that what we need and here is the result.
different sources. For example, given our custer order data sets, you may want to find out
if certain order are placed from the customers with their detailed information such as address etc. You’ll have to look at customer
data (cust_data.txt) as well as cust_order data (cust_order.txt). In the database world it would just be a matter of joining two tables, and most databases automatically take care of the join processing for you. Unfortunately, joiningdata in Hadoop is more involved, and there are several possible approaches withdifferent trade-offs.
you can download the complete code from my github
In this example I will demonstrate you to use Map Side join using distributed cache.
Input Data : order_custid.txt .............................................................................. 781571544 S9876126 781571545 S9876127 781571546 S9876128 781571547 S9876129 781571548 S9876130 781571549 S9876131 781571550 S9876132 781571551 S9876133 781571552 S9876134our customer dataset
Customer Data : cust_data.txt ............................................................................. 781571544 Smith,John (248)-555-9430 jsmith@aol.com 781571545 Hunter,April (815)-555-3029 april@showers.org 781571546 Ching,Iris (305)-555-0919 iching@zen.org 781571547 Doe,John (212)-555-0912 jdoe@morgue.com 781571548 Jones,Tom (312)-555-3321 tj2342@aol.com 781571549 Smith,John (607)-555-0023 smith@pocahontas.com 781571550 Crosby,Dave (405)-555-1516 cros@csny.org 781571551 Johns,Pam (313)-555-6790 pj@sleepy.com 781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net
We will create a our mapper class in the setup method we will parse through the order_custid file available in distributed cache and keep custid and order no in the hashmap.
package com.rajkrrsingh.hadoop.mapjoin; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, Text> { private static HashMap<String, String> CustIdOrderMap = new HashMap<String, String>(); private BufferedReader brReader; private String orderNO = ""; private Text outKey = new Text(""); private Text outValue = new Text(""); enum MYCOUNTER { RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR } @Override protected void setup(Context context) throws IOException, InterruptedException { Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context .getConfiguration()); for (Path eachPath : cacheFilesLocal) { if (eachPath.getName().toString().trim().equals("order_custid.txt")) { context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1); setupOrderHashMap(eachPath, context); } } } private void setupOrderHashMap(Path filePath, Context context) throws IOException { String strLineRead = ""; try { brReader = new BufferedReader(new FileReader(filePath.toString())); while ((strLineRead = brReader.readLine()) != null) { String custIdOrderArr[] = strLineRead.toString().split("\\s+"); CustIdOrderMap.put(custIdOrderArr[0].trim(), custIdOrderArr[1].trim()); } } catch (FileNotFoundException e) { e.printStackTrace(); context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1); } catch (IOException e) { context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1); e.printStackTrace(); }finally { if (brReader != null) { brReader.close(); } } } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1); if (value.toString().length() > 0) { String custDataArr[] = value.toString().split("\\s+"); try { orderNO = CustIdOrderMap.get(custDataArr[0].toString()); } finally { orderNO = ((orderNO.equals(null) || orderNO .equals("")) ? "NOT-FOUND" : orderNO); } outKey.set(custDataArr[0].toString()); outValue.set(custDataArr[1].toString() + "\t" + custDataArr[2].toString() + "\t" + custDataArr[3].toString() + "\t" + orderNO); } context.write(outKey, outValue); orderNO = ""; } }
Setup Driver class as follows:
package com.rajkrrsingh.hadoop.mapjoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class App extends Configured implements Tool { public static void main( String[] args ) throws Exception { int exitCode = ToolRunner.run(new Configuration(),new App(), args); System.exit(exitCode); } @Override public int run(String[] args) throws Exception { if(args.length !=2 ){ System.err.println("Usage : App -files <location-to-cust-id-and-order-file> <input path> <output path>"); System.exit(-1); } Job job = new Job(getConf()); job.setJobName("Map Side Join"); job.setJarByClass(App.class); FileInputFormat.addInputPath(job,new Path(args[0]) ); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MapSideJoinMapper.class); job.setNumReduceTasks(0); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } }here we have set 0 reducer so that the output of the map can directly written to the output file.
Now create a jar and run the following command on your Hadoop cluster
$bin/hadoop jar /home/rks/example/HadoopMapSideJoin/target/HadoopMapSideJoin.jar com.rajkrrsingh.hadoop.mapjoin -files /home/rks/Downloads/order_custid.txt input output
that what we need and here is the result.
output:: ................................................................................. 781571544 Smith,John (248)-555-9430 jsmith@aol.com S9876126 781571545 Hunter,April (815)-555-3029 april@showers.org S9876127 781571546 Ching,Iris (305)-555-0919 iching@zen.org S9876128 781571547 Doe,John (212)-555-0912 jdoe@morgue.com S9876129 781571548 Jones,Tom (312)-555-3321 tj2342@aol.com S9876130 781571549 Smith,John (607)-555-0023 smith@pocahontas.com S9876131 781571550 Crosby,Dave (405)-555-1516 cros@csny.org S9876132 781571551 Johns,Pam (313)-555-6790 pj@sleepy.com S9876133 781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134 781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134 781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134