Sunday, November 24, 2013

HBase compatibility with Hadoop 2.x

HBase releases downloaded from the mirror site are compiled against a certain Hadoop version.for example,hbase-0.94.7 and hbase-0.92.2 are compatible with Hadoop1.0.x version.They wont work with the Hadoop 2.x.

If you have Hadoop 2.x version installed on your system,you need to download the HBase source and compile against the Hadoop 2.x version.

let it make it to work with Hadoop 2.x version

Download and install maven latest version

Install subversion
$sudo yum install subversion

Checkout HBase code
$svn co http://svn.apache.org/repos/asf/hbase/tags/0.94.7 hbase-0.94.7

Now go to the HBase directory and build an HBase tarball,use -Dhadoop.profile option to compile against the Hadoop 2.x
$MAVEN_OPTS="-Xmx2g"
$mvn clean install javadoc:aggregate site assembly:assembly -DskipTests -Prelease -Dhadoop.profile=2.0

verify that *.tar.gz file produced in the target directory

Tuesday, November 19, 2013

Apache Oozie Workflow : Configure and Running a MapReduce job

In this post I will demonstrate you how to configure the Oozie workflow. let's develop a simple MapReduce program using java, if you find any difficulties in doing it then download the code from my git location.Download

Please follow my earlier post to install and run oozie server, create a job directory say SimpleOozieMR as per following directory structure

---SimpleOozieMR
----workflow
-----lib
------workflow.xml

in the lib folder copy the you hadoop job jar and related jars.
let's configure our workflow.xml and keep it into the workflow directory as shown.
<workflow-app name="WorkFlowPatentCitation" xmlns="uri:oozie:workflow:0.1">
    <start to="JavaMR-Job"/>
        <action name="JavaMR-Job">
                <java>
                        <job-tracker>${jobTracker}</job-tracker>
                        <name-node>${nameNode}</name-node>
                        <prepare>
                                <delete path="${outputDir}"/>
                        </prepare>
                        <configuration>
                            <name>mapred.queue.name</name>
       <value>default</value>
                        </configuration>
      <main-class>com.rjkrsinghhadoop.App</main-class>
      <arg>${citationIn}</arg>
      <arg>${citationOut}</arg>
                </java>
                <ok to="end"/>
                <error to="fail"/>
        </action>
        <kill name="fail">
            <message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
        </kill>
    <end name="end" />
</workflow-app>

Now configure your properties file PatentCitation.properties as follows
nameNode=hdfs://master:8020
jobTracker=master:8021
queueName=default
citationIn=citationIn-hdfs
citationOut=citationOut-hdfs
oozie.wf.application.path=$(namenode)/user/rks/oozieworkdir/SimpleOozieMR/workflow

lets create a shell script which will run your first oozie job:
#!/bin/sh
#
export OOZIE_URL="http://localhost:11000/oozie"
#copy your input data to the hdfs
hadoop fs -copyFromLocal /home/rks/CitationInput.txt citationIn-hdfs
#copy SimpleOozieMR to hdfs
hadoop fs -put /home/rks/SimpleOozieMR SimpleOozieMR
#running the oozie job
cd /usr/lib/oozie/bin/
oozie job -config /home/rks/SimpleOozieMR/PatentCitation.properties -run

Apache oozie : Getting Started

Apache oozie Introduction:

--- Started by Yahoo, currenly managed by Apache open source project.
--- Oozie is a workflow scheduler system to manage Apache Hadoop jobs.
-- MapReduce
-- Pig,Hive
-- Streaming
-- Standard Applications
--- Oozie is a scalable, reliable and extensible system.

--- User specifies action flow as Directed Acyclic Graph (DAG)
--- DAG: is a collection of vertices and directed edge configured so that one may not traverse the same vertex twice
--- Each node signifies eighter a Job or Script,Execution and branching can be parameterized by time, decision, data availability,
file size etc.
--- Client specifies process flow in webflow XML
--- Oozie is an extra level of abstraction between user and Hadoop
--- Oozie has its own server application which talks to it's own database(Apache Derby(default),MySql,Oracle etc.
--- User must load required component into the HDFS prior to the execution like input data, flow XML,JARs, resource files.





Interaction with Oozie through command line
$oozie job --oozie http://localhost:11000/oozie -config /user/rks/spjob/job.properties -run 


Web Interface





Installation
-Download Oozie from the Apache oozie official site
-Download ExtJS
-Configure core-site.xml
-restart namenode
-Copy Hadoop jars into a directory
-Extract ExtJS into Oozie's webapp
-Run oozie-setup.sh
-Relocalt newly generated war file
-Configure oozie-site.xml
-Initialize the databse
-Start oozie server

it's done, in the next course of action we will run MapReduce job configured using. stay tuned






Saturday, November 16, 2013

WebHDFS REST API -- Overview


Hadoop support all the hdfs operations using the underline java implementaion for all the HDFS commands like ls, mkdir, cat, rm, merge etc.
most of the time need arises to access the HDFS from some external applications other than the accessing the Hadoop cluster, the external system can be in any programming language other than the Java.

To support the access of HDFS from external application hadoop provides the WebHDFS REST API, which is based on the commonn http methods like GET,PUT,POST,DELETE. these methods supports the user operations like OPEN, GETFILESTATUS, LISTSTATUS are using HTTP GET, others like CREATE, MKDIRS, RENAME, SETPERMISSIONS are relying on HTTP PUT. APPEND operations is based on HTTP POST, while DELETE is using HTTP DELETE on the HDFS.


WebHDFS REST API OVERVIEW



In order to configure the WebHDFS, update hdfs-site.xml as follows
<property>
           <name>dfs.webhdfs.enabled</name>
           <value>true</value>
        </property>

now restart your namenode to access the WebHDFS.

in the omming post I will post about the java client to access the WebHDFS REST API... stay tuned

Monday, November 11, 2013

Mapreduce : Writing output to multiple files using MultipleOutputFormat

Multiple Outputs
FileOutputFormat and its subclasses generate a set of files in the output directory. There is one file per reducer, and files are named by the partition number: part-00000, part-00001, etc. There is sometimes a need to have more control over the naming of the files or to produce multiple files per reducer. MapReduce comes with two libraries to help you do this: MultipleOutputFormat and MultipleOutputs.

MultipleOutputFormat
MultipleOutputFormat allows you to write data to multiple files whose names are derived from the output keys and values. MultipleOutputFormat is an abstract class with two concrete subclasses, MultipleTextOutputFormat and MultipleSequenceFileOutputFormat, which are the multiple file equivalents of TextOutputFormat and SequenceFileOutputFormat. MultipleOutputFormat provides a few protected methods that subclasses can override to control the output filename. In Example 7-5, we create a subclass of MultipleTextOutputFormat to override the generateFileNameForKeyValue() method to return the station ID, which we extracted from the record value.

-- reference Hadoop Definitive guide

In this example I will demonstrate you how to write output data to multiple files.you can find the code of this example on the following git location

we have our sample customer data with attribute customer no,cust name, region, company. we will write the same region customer to the same file along with the other attributes.
customer no,customer name,region,company
.................................................................
9899821411,"Burke, Honorato U.",Alaska,Eu Incorporated
9899821422,"Bell, Emily R.",Arizona,Ut Eros Non Company
9899821379,"Hewitt, Chelsea Y.",PA,Egestas Aliquam Fringilla LLP
9899821387,"Baldwin, Merrill H.",VT,Rhoncus Proin Corp.
9899821392,"Bradshaw, Uma H.",OH,Nam Nulla Associates
9899821453,"Pollard, Boris G.",Hawaii,Consequat Corp.
9899821379,"Avila, Velma D.",OR,Sodales LLC

Create your Mapper Class as follows:
package com.rajkrrsingh.mr.hadoop;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MultipleOutputMapper extends Mapper<LongWritable, Text, Text, Text> {
 
 private Text txtKey = new Text("");
 private Text txtValue = new Text("");
 
 @Override
 protected void map(LongWritable key, Text value,Context context)
   throws IOException, InterruptedException {
  if(value.toString().length() > 0) {
   String[] custArray = value.toString().split(",");
   txtKey.set(custArray[0].toString());
   txtValue.set(custArray[1].toString()+"\t"+custArray[3].toString());
   context.write(txtKey, txtValue);
  }
 }

}

Setup your reduce class:
package com.rajkrrsingh.mr.hadoop;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MultiOutputReducer extends Reducer<Text, Text, Text, Text>{
 
 private MultipleOutputs multipleOutputs;
 
 @Override
 protected void setup(Context context) throws IOException, InterruptedException {
  multipleOutputs  = new MultipleOutputs(context);
 }
 
 @Override
 protected void reduce(Text key, Iterable<Text> values,Context context)
   throws IOException, InterruptedException {
  for(Text value : values) {
   multipleOutputs.write(key, value, key.toString());
  }
 }
 
 @Override
 protected void cleanup(Context context)
   throws IOException, InterruptedException {
  multipleOutputs.close();
 }

}

setup your driver class as follows:
package com.rajkrrsingh.mr.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class App extends Configured implements Tool
{
    public static void main( String[] args ) throws Exception
    {
       int exitCode = ToolRunner.run(new Configuration(),new App(),args);
       System.exit(exitCode);
    }

 @Override
 public int run(String[] args) throws Exception {
  if(args.length != 2) {
   System.out.println("Two Params are required to extecute App <input-path> <output-path>");
  }
  
  Job job = new Job(getConf());
  job.setJobName("MultipleOutputFormat example");
  job.setJarByClass(App.class);
  LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setMapperClass(MultipleOutputMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setReducerClass(MultiOutputReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setNumReduceTasks(5);
  
  boolean success = job.waitForCompletion(true);
  return success ? 0 : 1;
 }
}

Create the job jar using maven assembly plugin and run on your hadoop cluster
$bin/hadoop jar /home/rks/MultipleOutputExample/target/MultipleOutputExample.jar com.rajkrrsingh.mr.hadoop.App 
/user/rks/input /user/rks/output

Monday, October 28, 2013

Hadoop : Joining two datasets using Map Side Join

It’s inevitable that you’ll come across data analyses where you need to pull in data from
different sources. For example, given our custer order data sets, you may want to find out
if certain order are placed from the customers with their detailed information such as address etc. You’ll have to look at customer
data (cust_data.txt) as well as cust_order data (cust_order.txt). In the database world it would just be a matter of joining two tables, and most databases automatically take care of the join processing for you. Unfortunately, joiningdata in Hadoop is more involved, and there are several possible approaches withdifferent trade-offs.
you can download the complete code from my github
In this example I will demonstrate you to use Map Side join using distributed cache.

Input Data : order_custid.txt
..............................................................................

781571544 S9876126
781571545 S9876127
781571546 S9876128
781571547 S9876129
781571548 S9876130
781571549 S9876131
781571550 S9876132
781571551 S9876133
781571552 S9876134
our customer dataset
Customer Data : cust_data.txt
.............................................................................

781571544 Smith,John      (248)-555-9430  jsmith@aol.com
781571545 Hunter,April    (815)-555-3029  april@showers.org
781571546 Ching,Iris      (305)-555-0919  iching@zen.org
781571547 Doe,John        (212)-555-0912  jdoe@morgue.com
781571548 Jones,Tom       (312)-555-3321  tj2342@aol.com
781571549 Smith,John      (607)-555-0023  smith@pocahontas.com
781571550 Crosby,Dave     (405)-555-1516  cros@csny.org
781571551 Johns,Pam       (313)-555-6790  pj@sleepy.com
781571552 Jetter,Linda    (810)-555-8761  netless@earthlink.net

We will create a our mapper class in the setup method we will parse through the order_custid file available in distributed cache and keep custid and order no in the hashmap.

package com.rajkrrsingh.hadoop.mapjoin;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
 
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
 
        private static HashMap<String, String> CustIdOrderMap = new HashMap<String, String>();
        private BufferedReader brReader;
        private String orderNO = "";
        private Text outKey = new Text("");
        private Text outValue = new Text("");
 
        enum MYCOUNTER {
                RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
        }
 
        @Override
        protected void setup(Context context) throws IOException,
                        InterruptedException {
 
                Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context
                                .getConfiguration());
 
                for (Path eachPath : cacheFilesLocal) {
                        if (eachPath.getName().toString().trim().equals("order_custid.txt")) {
                                context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
                                setupOrderHashMap(eachPath, context);
                        }
                }
 
        }
 
        private void setupOrderHashMap(Path filePath, Context context)
                        throws IOException {
 
                String strLineRead = "";
 
                try {
                        brReader = new BufferedReader(new FileReader(filePath.toString()));
 
                        while ((strLineRead = brReader.readLine()) != null) {
                                String custIdOrderArr[] = strLineRead.toString().split("\\s+");
                                CustIdOrderMap.put(custIdOrderArr[0].trim(),        custIdOrderArr[1].trim());
                        }
                } catch (FileNotFoundException e) {
                        e.printStackTrace();
                        context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
                } catch (IOException e) {
                        context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
                        e.printStackTrace();
                }finally {
                        if (brReader != null) {
                                brReader.close();
 
                        }
 
                }
        }
 
        @Override
        public void map(LongWritable key, Text value, Context context)
                        throws IOException, InterruptedException {
 
                context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
 
                if (value.toString().length() > 0) {
                        String custDataArr[] = value.toString().split("\\s+");
 
                        try {
                                orderNO = CustIdOrderMap.get(custDataArr[0].toString());
                        } finally {
                                orderNO = ((orderNO.equals(null) || orderNO
                                                .equals("")) ? "NOT-FOUND" : orderNO);
                        }
 
                        outKey.set(custDataArr[0].toString());
 
                        outValue.set(custDataArr[1].toString() + "\t"
                                        + custDataArr[2].toString() + "\t"
                                        + custDataArr[3].toString() + "\t" + orderNO);
 
                }
                context.write(outKey, outValue);
                orderNO = "";
        }
}

Setup Driver class as follows:
package com.rajkrrsingh.hadoop.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class App extends Configured implements Tool
{
    public static void main( String[] args ) throws Exception
    {
            int exitCode = ToolRunner.run(new Configuration(),new App(), args);
            System.exit(exitCode);
        }
    @Override
    public int run(String[] args) throws Exception {
            if(args.length !=2 ){
                    System.err.println("Usage : App -files <location-to-cust-id-and-order-file> <input path> <output path>");
                    System.exit(-1);
            }
            Job job = new Job(getConf());
            job.setJobName("Map Side Join");
            job.setJarByClass(App.class);
            FileInputFormat.addInputPath(job,new Path(args[0]) );
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.setMapperClass(MapSideJoinMapper.class);
            job.setNumReduceTasks(0);
            
            boolean success = job.waitForCompletion(true);
                return success ? 0 : 1;
            
    }
}
here we have set 0 reducer so that the output of the map can directly written to the output file.
Now create a jar and run the following command on your Hadoop cluster
$bin/hadoop jar /home/rks/example/HadoopMapSideJoin/target/HadoopMapSideJoin.jar com.rajkrrsingh.hadoop.mapjoin -files /home/rks/Downloads/order_custid.txt input output

that what we need and here is the result.
output::
.................................................................................
781571544 Smith,John (248)-555-9430 jsmith@aol.com S9876126
781571545 Hunter,April (815)-555-3029 april@showers.org S9876127
781571546 Ching,Iris (305)-555-0919 iching@zen.org S9876128
781571547 Doe,John (212)-555-0912 jdoe@morgue.com S9876129
781571548 Jones,Tom (312)-555-3321 tj2342@aol.com S9876130
781571549 Smith,John (607)-555-0023 smith@pocahontas.com S9876131
781571550 Crosby,Dave (405)-555-1516 cros@csny.org S9876132
781571551 Johns,Pam (313)-555-6790 pj@sleepy.com S9876133
781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134
781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134
781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134