Monday, October 21, 2013

Hadoop : How to read and write Sequence File using mapreduce


Sequence files is a Hadoop specific archive file format similar to tar and zip. The concept behind this is to merge the file set with using a key and a value pair and this created files known as ‘Hadoop Sequence Files’. In this method file name is used as the key and the file content is used as value.

A sequence file consists of a header followed by one or more records. The first three bytes of a sequence file are the bytes SEQ, which acts a magic number, followed by a single byte representing the version number. The header contains other fields including the names of the key and value classes, compression details, user-defined metadata, and the sync marker. Recall that the sync marker is used to allow a reader to synchronize to a record boundary from any position in the file. Each file has a randomly generated sync marker, whose value is stored in the header. Sync markers appear between records in the sequence file. They are designed to incur less than a 1% storage overhead, so they don’t necessarily appear between every pair of records (such is the case for short records).



The internal format of the records depends on whether compression is enabled, and if it is, whether it is record compression or block compression.

If no compression is enabled (the default), then each record is made up of the record length (in bytes), the key length, the key, and then the value. The length fields are written as four-byte integers adhering to the contract of the writeInt() method of java.io.DataOutput. Keys and values are serialized using the Serialization defined for the class being written to the sequence file.

In this sample code I will demonstarate you how to read and write the sequence file. The Complete code is available on my Git repo

we will use the the following sample data:
#custId orderNo
965412 S986512
965413 S986513
965414 S986514
965415 S986515
965416 S986516

configure the hadoop related dependencies in the pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.rjkrsinghhadoop</groupId>
  <artifactId>SequenceFileReaderWriter</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>SequenceFileReaderWriter</name>
  <url>http://maven.apache.org</url>
  <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.7</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging-api</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.0.4</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.2</version>
        </dependency>
    </dependencies>

<!--
    <repositories>
        <repository>
            <id>libdir</id>
            <url>file://${basedir}/lib</url>
        </repository>
    </repositories>
-->

    <build>
        <finalName>exploringhadoop</finalName>
        <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
     <source>1.6</source>
     <target>1.6</target>
    </configuration>
   </plugin>
   <plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
     <finalName>${project.name}-${project.version}</finalName>
     <appendAssemblyId>true</appendAssemblyId>
     <descriptors>
      <descriptor>src/main/assembly/assembly.xml</descriptor>
     </descriptors>
    </configuration>
   </plugin>
        </plugins>
    </build>
</project>

Now create a mapper class to as follows:

package com.rjkrsinghhadoop;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SequenceFileWriterMapper extends Mapper<Text,Text,Text,Text> {
        
        
        @Override
        protected void map(Text key, Text value,Context context)         throws IOException, InterruptedException {
                context.write(key, value);                
        }

}

Create a java class SequenceFileWriterApp which will write a text file to the Sequence file

package com.rjkrsinghhadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SequenceFileWriterApp 
{
    public static void main( String[] args ) throws Exception
    {
            if(args.length !=2 ){
                    System.err.println("Usage : Sequence File Writer Utility <input path> <output path>");
                    System.exit(-1);
            }
            Configuration conf = new Configuration();
            Job job = new Job(conf);
            job.setJarByClass(SequenceFileWriterApp.class);
            job.setJobName("SequenceFileWriter");
            
            FileInputFormat.addInputPath(job,new Path(args[0]) );
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            job.setMapperClass(SequenceFileWriterMapper.class);
            
            job.setInputFormatClass(KeyValueTextInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.setNumReduceTasks(0);
            
            
            System.exit(job.waitForCompletion(true) ? 0:1);
    }
}

To read a sequence file and convert it back to the txt file we need a SequenceFileReader
package com.rjkrsinghhadoop;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SequenceFileReader  {


  public static void main(String[] args) throws Exception {
          if(args.length !=2 ){
                System.err.println("Usage : Sequence File Writer Utility <input path> <output path>");
                System.exit(-1);
        }
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJarByClass(SequenceFileReader.class);
        job.setJobName("SequenceFileReader");
        
        FileInputFormat.addInputPath(job,new Path(args[0]) );
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        job.setMapperClass(SequenceFileWriterMapper.class);
        
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        
        
        System.exit(job.waitForCompletion(true) ? 0:1);
}
}
r

ship your code in the jar file we will need an assembly descriptor create a assembly.xml in the resources folder as follows:
<assembly
    xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id>job</id>
    <formats>
        <format>jar</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <dependencySets>
        <dependencySet>
            <unpack>false</unpack>
            <scope>runtime</scope>
            <outputDirectory>lib</outputDirectory>
            <excludes>
                <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
            </excludes>
        </dependencySet>
        <dependencySet>
            <unpack>false</unpack>
            <scope>system</scope>
            <outputDirectory>lib</outputDirectory>
            <excludes>
                <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
            </excludes>
        </dependencySet>
    </dependencySets>
    <fileSets>
        <fileSet>
            <directory>${basedir}/target/classes</directory>
            <outputDirectory>/</outputDirectory>
            <excludes>
                <exclude>*.jar</exclude>
            </excludes>
        </fileSet>
    </fileSets>
</assembly>

now run mvn assembly:assembly which will create a jar file in the target directory, which is ready to be run on your hadoop cluster.