Wednesday, August 28, 2013

Apache Hadoop and Spring Data : Configuring mapreduce Job

Spring for Apache Hadoop simplifies developing Apache Hadoop by providing a unified configuration model and easy to use APIs for using HDFS, MapReduce, Pig, and Hive. It also provides integration with other Spring ecosystem project such as Spring Integration and Spring Batch enabling you to develop solutions for big data ingest/export and Hadoop workflow orchestration.

In this tutorial I am going to demonstrate you how to configure mapreduce with the spring.the complete source code is available on the Github location

I assume that your Hadoop Cluster is up and running:

Let's set up a simple java project using maven and add the fallowing dependencies in the POM.xml.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.rajkrrsingh.hadoop</groupId>
  <artifactId>HadoopSpringData</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>HadoopSpringData</name>
  <url>http://maven.apache.org</url>

  <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <apache.hadoop.version>1.0.4</apache.hadoop.version>
        <slf4j.version>1.6.1</slf4j.version>
        <spring.version>3.1.2.RELEASE</spring.version>
        <spring.data.hadoop.version>1.0.0.RELEASE</spring.data.hadoop.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <!-- Apache Commons -->
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>

        <!-- Spring Framework -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib</artifactId>
            <version>2.2.2</version>
        </dependency>
        <!-- Spring Data Apache Hadoop -->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop</artifactId>
            <version>${spring.data.hadoop.version}</version>
        </dependency>
        <!-- Apache Hadoop Core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>${apache.hadoop.version}</version>
        </dependency>
        <!-- Logging dependencies -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>
        <!-- Testing Dependencies -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.9</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>1.8.5</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <finalName>SpringHadoopJob</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.2.2</version>
                <configuration>
                    <descriptors>
                        <descriptor>src/main/assembly/assembly.xml</descriptor>
                    </descriptors>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.3.1</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.rajkrrsingh.hadoop.MaxTemperature</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-site-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <reportPlugins>
                        <!-- Cobertura Plugin -->
                        <plugin>
                            <groupId>org.codehaus.mojo</groupId>
                            <artifactId>cobertura-maven-plugin</artifactId>
                            <version>2.5.1</version>
                        </plugin>
                    </reportPlugins>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

let us take a Max Temperature mapreduce example from the Hadoop definite guide and setup Mapper Reducer and Main Class:
here is our code for the Mapper:
package com.rajkrrsingh.hadoop;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
  extends Mapper<LongWritable, Text, Text, IntWritable> {

  private static final int MISSING = 9999;
  
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    
    String line = value.toString();
    String year = line.substring(15, 19);
    int airTemperature;
    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
      airTemperature = Integer.parseInt(line.substring(88, 92));
    } else {
      airTemperature = Integer.parseInt(line.substring(87, 92));
    }
    String quality = line.substring(92, 93);
    if (airTemperature != MISSING && quality.matches("[01459]")) {
      context.write(new Text(year), new IntWritable(airTemperature));
    }
  }
}

Setup Reducer as fallows:

package com.rajkrrsingh.hadoop;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
  extends Reducer<Text, IntWritable, Text, IntWritable> {
  
  @Override
  public void reduce(Text key, Iterable<IntWritable> values,
      Context context)
      throws IOException, InterruptedException {
    
    int maxValue = Integer.MIN_VALUE;
    for (IntWritable value : values) {
      maxValue = Math.max(maxValue, value.get());
    }
    context.write(key, new IntWritable(maxValue));
  }
}


In our main class we are executing the job using application context so we need to set up ApplicationContext but before that we need to provide the application specific properties files as fallows.
application.properties
fs.default.name=hdfs://master:54310
mapred.job.tracker=master:54311

input.path=/user/rajkrrsingh/SAMPLE
output.path=/user/rajkrrsingh/MaxTempOut

In application.properties file we are providing the namenode location,job tracker location,input path and output path of the data.the input data can be downloaded from the Here

Now we need to setup out ApplicationContext,in the provided code we are providing hadoop configuration using hdp:configuration, setup of job is done under the job element where we are providing the out input output paths,our job driver class and our Mapper reducer.

The next setting which we are doing here is to configure our job runner which will invoke our configured job,if you have multiple mapreduce jobs then we can configure the same in the applicationContext.xml.

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:hdp="http://www.springframework.org/schema/hadoop"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">

    <!-- Configures the properties file. -->
    <context:property-placeholder location="classpath:application.properties" />

    <!-- Configures Apache Hadoop -->
    <hdp:configuration>
        fs.default.name=${fs.default.name}
        mapred.job.tracker=${mapred.job.tracker}
    </hdp:configuration>

    <!-- Configures the Max Temp job. -->
    <hdp:job id="MaxTempJob"
             input-path="${input.path}"
             output-path="${output.path}"
             jar-by-class="com.rajkrrsingh.hadoop.MaxTemperature"
             mapper="com.rajkrrsingh.hadoop.MaxTemperatureMapper"
             reducer="com.rajkrrsingh.hadoop.MaxTemperatureReducer"/>

    <!-- Configures the job runner that runs the Hadoop jobs. -->
    <hdp:job-runner id="MaxTempJobRunner" job-ref="MaxTempJob" run-at-startup="true"/>
</beans>


lets accrss our application context in our job driver class i.e MaxTemprature.java as fallows.
package com.rajkrrsingh.hadoop;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MaxTemperature {

  public static void main(String[] args) throws Exception {
   ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
}
}

Now its time to package our jar and run on the cluster for that I have provided the assembly.xml under the resources folder,run mvn assembly:assembly which will create a zip file in which you will find the SpringDataHadoop.jar

Now on NameNode run your jar as fallows

$hadoop jar SpringDataHadoop.jar

after successful completion check the output directory