Monday, October 21, 2013

Hadoop : How to read and write a Map file

MapFile
A MapFile is a sorted SequenceFile with an index to permit lookups by key. MapFile can be thought of as a persistent form of java.util.Map (although it doesn’t implement this interface), which is able to grow beyond the size of a Map that is kept in memory.

Writing a MapFile
Writing a MapFile is similar to writing a SequenceFile: you create an instance of MapFile.Writer, then call the append() method to add entries in order. (Attempting to add entries out of order will result in an IOException.) Keys must be instances of WritableComparable, and values must be Writable—contrast this to SequenceFile, which can use any serialization framework for its entries.

Iterating through the entries in order in a MapFile is similar to the procedure for a SequenceFile: you create a MapFile.Reader, then call the next() method until it returns false, signifying that no entry was read because the end of the file was reached

our data sample:
#custId orderNo
965412 S986512
965413 S986513
965414 S986514
965415 S986515
965416 S986516

Now configure your dependencies in pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.rjkrsinghhadoop</groupId>
  <artifactId>MapFileConverter</artifactId>
  <packaging>jar</packaging>
  <version>1.0</version>
  <name>MapFileConverter</name>
  <url>http://maven.apache.org</url>
  <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.7</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging-api</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.0.4</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.2</version>
        </dependency>
    </dependencies>

<!--
    <repositories>
        <repository>
            <id>libdir</id>
            <url>file://${basedir}/lib</url>
        </repository>
    </repositories>
-->

    <build>
        <finalName>exploringhadoop</finalName>
        <plugins>
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-compiler-plugin</artifactId>
                                <configuration>
                                        <source>1.6</source>
                                        <target>1.6</target>
                                </configuration>
                        </plugin>
                        <plugin>
                                <artifactId>maven-assembly-plugin</artifactId>
                                <configuration>
                                        <finalName>${project.name}-${project.version}</finalName>
                                        <appendAssemblyId>true</appendAssemblyId>
                                        <descriptors>
                                                <descriptor>src/main/assembly/assembly.xml</descriptor>
                                        </descriptors>
                                </configuration>
                        </plugin>
        </plugins>
    </build>
</project>

Create MapFileConverter.java which will responsible for converting the text file to the Map file
package com.rjkrsinghhadoop;


import  java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;

public class MapFileConverter {

  @SuppressWarnings("deprecation")
        public static void main(String[] args) throws IOException{
                
                Configuration conf = new Configuration();
                FileSystem fs;
                
                try {
                        fs = FileSystem.get(conf);
                        
                        Path inputFile = new Path(args[0]);
                        Path outputFile = new Path(args[1]);

                      Text txtKey = new Text();
                          Text txtValue = new Text();

                        String strLineInInputFile = "";
                        String lstKeyValuePair[] = null;
                        MapFile.Writer writer = null;
                        
                        FSDataInputStream inputStream = fs.open(inputFile);

                        try {
                                writer = new MapFile.Writer(conf, fs, outputFile.toString(),
                                                txtKey.getClass(), txtKey.getClass());
                                writer.setIndexInterval(1);
                                while (inputStream.available() > 0) {
                                        strLineInInputFile = inputStream.readLine();
                                        lstKeyValuePair = strLineInInputFile.split("\\t");
                                        txtKey.set(lstKeyValuePair[0]);
                                        txtValue.set(lstKeyValuePair[1]);
                                        writer.append(txtKey, txtValue);
                                }
                        } finally {
                                IOUtils.closeStream(writer);
                                System.out.println("Map file created successfully!!");
                  }
        } catch (IOException e) {
                        e.printStackTrace();
                }        
        }
}

to look up a file based on the provided key we will use MapFileReader.

package com.rjkrsinghhadoop;


import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;

public class MapFileReader {

  
        @SuppressWarnings("deprecation")
        public static void main(String[] args) throws IOException {

                Configuration conf = new Configuration();
                FileSystem fs = null;
                Text txtKey = new Text(args[1]);
                Text txtValue = new Text();
                MapFile.Reader reader = null;

                try {
                        fs = FileSystem.get(conf);

                        try {
                                reader = new MapFile.Reader(fs, args[0].toString(), conf);
                                reader.get(txtKey, txtValue);
                        } catch (IOException e) {
                                e.printStackTrace();
                        }

                } catch (IOException e) {
                        e.printStackTrace();
                }
                System.out.println("The value for Key "+ txtKey.toString() +" is "+ txtValue.toString());
        }
}

to ship your code in the jar file we will need an assembly descriptor create a assembly.xml in the resources folder as follows:
<assembly
    xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id>job</id>
    <formats>
        <format>jar</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <dependencySets>
        <dependencySet>
            <unpack>false</unpack>
            <scope>runtime</scope>
            <outputDirectory>lib</outputDirectory>
            <excludes>
                <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
            </excludes>
        </dependencySet>
        <dependencySet>
            <unpack>false</unpack>
            <scope>system</scope>
            <outputDirectory>lib</outputDirectory>
            <excludes>
                <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
            </excludes>
        </dependencySet>
    </dependencySets>
    <fileSets>
        <fileSet>
            <directory>${basedir}/target/classes</directory>
            <outputDirectory>/</outputDirectory>
            <excludes>
                <exclude>*.jar</exclude>
            </excludes>
        </fileSet>
    </fileSets>
</assembly>

now run mvn assembly:assembly which will create a jar file in the target directory, which is ready to be run on your hadoop cluster.