Wednesday, August 28, 2013

Apache Hadoop and Spring Data : Configuring mapreduce Job

Spring for Apache Hadoop simplifies developing Apache Hadoop by providing a unified configuration model and easy to use APIs for using HDFS, MapReduce, Pig, and Hive. It also provides integration with other Spring ecosystem project such as Spring Integration and Spring Batch enabling you to develop solutions for big data ingest/export and Hadoop workflow orchestration.

In this tutorial I am going to demonstrate you how to configure mapreduce with the spring.the complete source code is available on the Github location

I assume that your Hadoop Cluster is up and running:

Let's set up a simple java project using maven and add the fallowing dependencies in the POM.xml.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.rajkrrsingh.hadoop</groupId>
  <artifactId>HadoopSpringData</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>HadoopSpringData</name>
  <url>http://maven.apache.org</url>

  <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <apache.hadoop.version>1.0.4</apache.hadoop.version>
        <slf4j.version>1.6.1</slf4j.version>
        <spring.version>3.1.2.RELEASE</spring.version>
        <spring.data.hadoop.version>1.0.0.RELEASE</spring.data.hadoop.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <!-- Apache Commons -->
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>

        <!-- Spring Framework -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib</artifactId>
            <version>2.2.2</version>
        </dependency>
        <!-- Spring Data Apache Hadoop -->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop</artifactId>
            <version>${spring.data.hadoop.version}</version>
        </dependency>
        <!-- Apache Hadoop Core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>${apache.hadoop.version}</version>
        </dependency>
        <!-- Logging dependencies -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>
        <!-- Testing Dependencies -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.9</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>1.8.5</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <finalName>SpringHadoopJob</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.2.2</version>
                <configuration>
                    <descriptors>
                        <descriptor>src/main/assembly/assembly.xml</descriptor>
                    </descriptors>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.3.1</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.rajkrrsingh.hadoop.MaxTemperature</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-site-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <reportPlugins>
                        <!-- Cobertura Plugin -->
                        <plugin>
                            <groupId>org.codehaus.mojo</groupId>
                            <artifactId>cobertura-maven-plugin</artifactId>
                            <version>2.5.1</version>
                        </plugin>
                    </reportPlugins>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

let us take a Max Temperature mapreduce example from the Hadoop definite guide and setup Mapper Reducer and Main Class:
here is our code for the Mapper:
package com.rajkrrsingh.hadoop;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
  extends Mapper<LongWritable, Text, Text, IntWritable> {

  private static final int MISSING = 9999;
  
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    
    String line = value.toString();
    String year = line.substring(15, 19);
    int airTemperature;
    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
      airTemperature = Integer.parseInt(line.substring(88, 92));
    } else {
      airTemperature = Integer.parseInt(line.substring(87, 92));
    }
    String quality = line.substring(92, 93);
    if (airTemperature != MISSING && quality.matches("[01459]")) {
      context.write(new Text(year), new IntWritable(airTemperature));
    }
  }
}

Setup Reducer as fallows:

package com.rajkrrsingh.hadoop;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
  extends Reducer<Text, IntWritable, Text, IntWritable> {
  
  @Override
  public void reduce(Text key, Iterable<IntWritable> values,
      Context context)
      throws IOException, InterruptedException {
    
    int maxValue = Integer.MIN_VALUE;
    for (IntWritable value : values) {
      maxValue = Math.max(maxValue, value.get());
    }
    context.write(key, new IntWritable(maxValue));
  }
}


In our main class we are executing the job using application context so we need to set up ApplicationContext but before that we need to provide the application specific properties files as fallows.
application.properties
fs.default.name=hdfs://master:54310
mapred.job.tracker=master:54311

input.path=/user/rajkrrsingh/SAMPLE
output.path=/user/rajkrrsingh/MaxTempOut

In application.properties file we are providing the namenode location,job tracker location,input path and output path of the data.the input data can be downloaded from the Here

Now we need to setup out ApplicationContext,in the provided code we are providing hadoop configuration using hdp:configuration, setup of job is done under the job element where we are providing the out input output paths,our job driver class and our Mapper reducer.

The next setting which we are doing here is to configure our job runner which will invoke our configured job,if you have multiple mapreduce jobs then we can configure the same in the applicationContext.xml.

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:hdp="http://www.springframework.org/schema/hadoop"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">

    <!-- Configures the properties file. -->
    <context:property-placeholder location="classpath:application.properties" />

    <!-- Configures Apache Hadoop -->
    <hdp:configuration>
        fs.default.name=${fs.default.name}
        mapred.job.tracker=${mapred.job.tracker}
    </hdp:configuration>

    <!-- Configures the Max Temp job. -->
    <hdp:job id="MaxTempJob"
             input-path="${input.path}"
             output-path="${output.path}"
             jar-by-class="com.rajkrrsingh.hadoop.MaxTemperature"
             mapper="com.rajkrrsingh.hadoop.MaxTemperatureMapper"
             reducer="com.rajkrrsingh.hadoop.MaxTemperatureReducer"/>

    <!-- Configures the job runner that runs the Hadoop jobs. -->
    <hdp:job-runner id="MaxTempJobRunner" job-ref="MaxTempJob" run-at-startup="true"/>
</beans>


lets accrss our application context in our job driver class i.e MaxTemprature.java as fallows.
package com.rajkrrsingh.hadoop;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MaxTemperature {

  public static void main(String[] args) throws Exception {
   ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
}
}

Now its time to package our jar and run on the cluster for that I have provided the assembly.xml under the resources folder,run mvn assembly:assembly which will create a zip file in which you will find the SpringDataHadoop.jar

Now on NameNode run your jar as fallows

$hadoop jar SpringDataHadoop.jar

after successful completion check the output directory

Monday, August 26, 2013

Import TSV file from HFDS to HBase using Identity Mapper


In this tutorial I am going to demonstrate you how to import the tab separated file stored on the HDFS to HBase database.

lets start by creating a table on HBase

Step 1 : Create a table in HBase with the name of orders

with coloum family 'ship_to_address','ord_date','ship_date','item','status','price'
create 'orders','ship_to_address','ord_date','ship_date','item','status','price'

Here is our input tsv file stored on hdfs

USA NY New York 28-07-2013 29-07-2013 Toner shipped 200$
USA California San Fransico 29-07-2013 29-07-2013 Cati in process 150$
USA NY Rochester 28-07-2013 28-07-2013 Toner shipped 200$
USA NY Syracuse 21-07-2013 23-07-2013 Paper shipped 80$
USA NY Albany 21-07-2013 21-07-2013 Paper failed 80$
USA California Long Beach 26-07-2013 28-07-2013 Toner shipped 200$

Step 2 : Write your identity Mapper class as fallows:
public class ImportFromTSVMapper extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
    
        @Override
        public void map(LongWritable offset, Text line, Context context)
                throws IOException {
            try {
                String lineString = line.toString();
                String[] arr = lineString.split("\t");
                Put put = new Put(arr[0].getBytes());
                put.add("ship_to_address".getBytes(), "country".getBytes(), Bytes.toBytes(arr[1]));
                put.add("ship_to_address".getBytes(),"state".getBytes(), Bytes.toBytes(arr[2]));
                put.add("ship_to_address".getBytes(),"city".getBytes(), Bytes.toBytes(arr[3]));
                put.add("ord_date".getBytes(),"ord_date".getBytes(), Bytes.toBytes(arr[4]));
                put.add("ship_date".getBytes(),"ship_date".getBytes(), Bytes.toBytes(arr[5]));
                put.add("item".getBytes(),"item".getBytes(), Bytes.toBytes(arr[6]));
                put.add("status".getBytes(),"status".getBytes(), Bytes.toBytes(arr[7]));
                put.add("price".getBytes(),"price".getBytes(), Bytes.toBytes(arr[8]));
                       context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

Step 3 : Write your job Main class to configure MR job
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ImportTSVFile {
    
    
   public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        String table = "order";
        String input = "/home/rajkrrsingh/mspfeed/ordfeed";
        String column = "";

        conf.set("conf.column", column);
        Job job = new Job(conf, "Import from hdfs to hbase");
        job.setJarByClass(ImportTSVFile.class);
        job.setMapperClass(ImportFromTSVMapper.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Writable.class);
        job.setNumReduceTasks(0);
        FileInputFormat.addInputPath(job, new Path(input));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}


The result can be verified using HBase console,your tsv file has been imported to the HBase database.