Monday, October 28, 2013

Hadoop : Joining two datasets using Map Side Join

It’s inevitable that you’ll come across data analyses where you need to pull in data from
different sources. For example, given our custer order data sets, you may want to find out
if certain order are placed from the customers with their detailed information such as address etc. You’ll have to look at customer
data (cust_data.txt) as well as cust_order data (cust_order.txt). In the database world it would just be a matter of joining two tables, and most databases automatically take care of the join processing for you. Unfortunately, joiningdata in Hadoop is more involved, and there are several possible approaches withdifferent trade-offs.
you can download the complete code from my github
In this example I will demonstrate you to use Map Side join using distributed cache.

Input Data : order_custid.txt
..............................................................................

781571544 S9876126
781571545 S9876127
781571546 S9876128
781571547 S9876129
781571548 S9876130
781571549 S9876131
781571550 S9876132
781571551 S9876133
781571552 S9876134
our customer dataset
Customer Data : cust_data.txt
.............................................................................

781571544 Smith,John      (248)-555-9430  jsmith@aol.com
781571545 Hunter,April    (815)-555-3029  april@showers.org
781571546 Ching,Iris      (305)-555-0919  iching@zen.org
781571547 Doe,John        (212)-555-0912  jdoe@morgue.com
781571548 Jones,Tom       (312)-555-3321  tj2342@aol.com
781571549 Smith,John      (607)-555-0023  smith@pocahontas.com
781571550 Crosby,Dave     (405)-555-1516  cros@csny.org
781571551 Johns,Pam       (313)-555-6790  pj@sleepy.com
781571552 Jetter,Linda    (810)-555-8761  netless@earthlink.net

We will create a our mapper class in the setup method we will parse through the order_custid file available in distributed cache and keep custid and order no in the hashmap.

package com.rajkrrsingh.hadoop.mapjoin;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
 
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
 
        private static HashMap<String, String> CustIdOrderMap = new HashMap<String, String>();
        private BufferedReader brReader;
        private String orderNO = "";
        private Text outKey = new Text("");
        private Text outValue = new Text("");
 
        enum MYCOUNTER {
                RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
        }
 
        @Override
        protected void setup(Context context) throws IOException,
                        InterruptedException {
 
                Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context
                                .getConfiguration());
 
                for (Path eachPath : cacheFilesLocal) {
                        if (eachPath.getName().toString().trim().equals("order_custid.txt")) {
                                context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
                                setupOrderHashMap(eachPath, context);
                        }
                }
 
        }
 
        private void setupOrderHashMap(Path filePath, Context context)
                        throws IOException {
 
                String strLineRead = "";
 
                try {
                        brReader = new BufferedReader(new FileReader(filePath.toString()));
 
                        while ((strLineRead = brReader.readLine()) != null) {
                                String custIdOrderArr[] = strLineRead.toString().split("\\s+");
                                CustIdOrderMap.put(custIdOrderArr[0].trim(),        custIdOrderArr[1].trim());
                        }
                } catch (FileNotFoundException e) {
                        e.printStackTrace();
                        context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
                } catch (IOException e) {
                        context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
                        e.printStackTrace();
                }finally {
                        if (brReader != null) {
                                brReader.close();
 
                        }
 
                }
        }
 
        @Override
        public void map(LongWritable key, Text value, Context context)
                        throws IOException, InterruptedException {
 
                context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
 
                if (value.toString().length() > 0) {
                        String custDataArr[] = value.toString().split("\\s+");
 
                        try {
                                orderNO = CustIdOrderMap.get(custDataArr[0].toString());
                        } finally {
                                orderNO = ((orderNO.equals(null) || orderNO
                                                .equals("")) ? "NOT-FOUND" : orderNO);
                        }
 
                        outKey.set(custDataArr[0].toString());
 
                        outValue.set(custDataArr[1].toString() + "\t"
                                        + custDataArr[2].toString() + "\t"
                                        + custDataArr[3].toString() + "\t" + orderNO);
 
                }
                context.write(outKey, outValue);
                orderNO = "";
        }
}

Setup Driver class as follows:
package com.rajkrrsingh.hadoop.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class App extends Configured implements Tool
{
    public static void main( String[] args ) throws Exception
    {
            int exitCode = ToolRunner.run(new Configuration(),new App(), args);
            System.exit(exitCode);
        }
    @Override
    public int run(String[] args) throws Exception {
            if(args.length !=2 ){
                    System.err.println("Usage : App -files <location-to-cust-id-and-order-file> <input path> <output path>");
                    System.exit(-1);
            }
            Job job = new Job(getConf());
            job.setJobName("Map Side Join");
            job.setJarByClass(App.class);
            FileInputFormat.addInputPath(job,new Path(args[0]) );
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.setMapperClass(MapSideJoinMapper.class);
            job.setNumReduceTasks(0);
            
            boolean success = job.waitForCompletion(true);
                return success ? 0 : 1;
            
    }
}
here we have set 0 reducer so that the output of the map can directly written to the output file.
Now create a jar and run the following command on your Hadoop cluster
$bin/hadoop jar /home/rks/example/HadoopMapSideJoin/target/HadoopMapSideJoin.jar com.rajkrrsingh.hadoop.mapjoin -files /home/rks/Downloads/order_custid.txt input output

that what we need and here is the result.
output::
.................................................................................
781571544 Smith,John (248)-555-9430 jsmith@aol.com S9876126
781571545 Hunter,April (815)-555-3029 april@showers.org S9876127
781571546 Ching,Iris (305)-555-0919 iching@zen.org S9876128
781571547 Doe,John (212)-555-0912 jdoe@morgue.com S9876129
781571548 Jones,Tom (312)-555-3321 tj2342@aol.com S9876130
781571549 Smith,John (607)-555-0023 smith@pocahontas.com S9876131
781571550 Crosby,Dave (405)-555-1516 cros@csny.org S9876132
781571551 Johns,Pam (313)-555-6790 pj@sleepy.com S9876133
781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134
781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134
781571552 Jetter,Linda (810)-555-8761 netless@earthlink.net S9876134

Monday, October 21, 2013

Hadoop : Merging Small tar files to the Sequence File

The Hadoop Distributed File System (HDFS) is a distributed file system. It is mainly designed for batch processing of large volume of data. The default block size of HDFS is 64MB. When data is represented in files significantly smaller than the default block size the performance degrades dramatically. Mainly there are two reasons for producing small files. One reason is some files are pieces of a larger logical file (e.g. - log files). Since HDFS has only recently supported appends, these unbounded files are saved by writing them in chunks into HDFS. Other reason is some files cannot be combined together into one larger file and are essentially small. e.g. - A large corpus of images where each image is a distinct file.

Solution to the small files by merging them into a Sequence File:

Sequence files is a Hadoop specific archive file format similar to tar and zip. The concept behind this is to merge the file set with using a key and a value pair and this created files known as ‘Hadoop Sequence Files’. In this method file name is used as the key and the file content is used as value.

In the proposed solution we will demonstrate how to write small files to the Sequence File and a Sequence file reader which will list the file name in Sequence File:

Setting up a local file system:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

public class LocalSetup {

    private FileSystem fileSystem;
    private Configuration config;

    
    public LocalSetup() throws Exception {
        config = new Configuration();

        
        config.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");

        fileSystem = FileSystem.get(config);
        if (fileSystem.getConf() == null) {
                throw new Exception("LocalFileSystem configuration is null");
        }
    }

    
    public Configuration getConf() {
        return config;
    }

    
    public FileSystem getLocalFileSystem() {
        return fileSystem;
    }
}

In the next course of action we will setup a class which will read from the .tar.gz,.tgz,.tar.bz2 extension files and write it to the Sequence File with key as the name of file and value be the content of the file:
import org.apache.tools.bzip2.CBZip2InputStream;
import org.apache.tools.tar.TarEntry;
import org.apache.tools.tar.TarInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;



public class TarToSeqFile {

    private File inputFile;
    private File outputFile;
    private LocalSetup setup;

    
    public TarToSeqFile() throws Exception {
        setup = new LocalSetup();
    }

    
    public void setInput(File inputFile) {
        this.inputFile = inputFile;
    }

    public void setOutput(File outputFile) {
        this.outputFile = outputFile;
    }

    public void execute() throws Exception {
        TarInputStream input = null;
        SequenceFile.Writer output = null;
        try {
            input = openInputFile();
            output = openOutputFile();
            TarEntry entry;
            while ((entry = input.getNextEntry()) != null) {
                if (entry.isDirectory()) { continue; }
                String filename = entry.getName();
                byte[] data = TarToSeqFile.getBytes(input, entry.getSize());
                
                Text key = new Text(filename);
                BytesWritable value = new BytesWritable(data);
                output.append(key, value);
            }
        } finally {
            if (input != null) { input.close(); }
            if (output != null) { output.close(); }
        }
    }

    private TarInputStream openInputFile() throws Exception {
        InputStream fileStream = new FileInputStream(inputFile);
        String name = inputFile.getName();
        InputStream theStream = null;
        if (name.endsWith(".tar.gz") || name.endsWith(".tgz")) {
            theStream = new GZIPInputStream(fileStream);
        } else if (name.endsWith(".tar.bz2") || name.endsWith(".tbz2")) {
            fileStream.skip(2);
            theStream = new CBZip2InputStream(fileStream);
        } else {
            theStream = fileStream;
        }
        return new TarInputStream(theStream);
    }

    private SequenceFile.Writer openOutputFile() throws Exception {
        Path outputPath = new Path(outputFile.getAbsolutePath());
        return SequenceFile.createWriter(setup.getLocalFileSystem(), setup.getConf(),
                                         outputPath,
                                         Text.class, BytesWritable.class,
                                         SequenceFile.CompressionType.BLOCK);
    }

    
    private static byte[] getBytes(TarInputStream input, long size) throws Exception {
        if (size > Integer.MAX_VALUE) {
            throw new Exception("A file in the tar archive is too large.");
        }
        int length = (int)size;
        byte[] bytes = new byte[length];

        int offset = 0;
        int numRead = 0;

        while (offset < bytes.length &&
               (numRead = input.read(bytes, offset, bytes.length - offset)) >= 0) {
            offset += numRead;
        }

        if (offset < bytes.length) {
            throw new IOException("A file in the tar archive could not be completely read.");
        }

        return bytes;
    }

    
    public static void main(String[] args) {
        if (args.length != 2) {
            exitWithHelp();
        }

        try {
            TarToSeqFile me = new TarToSeqFile();
            me.setInput(new File(args[0]));
            me.setOutput(new File(args[1]));
            me.execute();
        } catch (Exception e) {
            e.printStackTrace();
            exitWithHelp();
        }
    }

    public static void exitWithHelp() {
        System.err.println("Usage:  <tarfile> TarToSeqFile  <output>\n\n" +
                           "<tarfile> may be GZIP or BZIP2 compressed, must have a\n" +
                           "recognizable extension .tar, .tar.gz, .tgz, .tar.bz2, or .tbz2.");
        System.exit(1);
    }
}

In this way we can write files to a single Sequence file, to test it further we will read from the Sequence file and list the keys of the file as output
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;


public class SeqKeyList {

    private String inputFile;
    private LocalSetup setup;

    public SeqKeyList() throws Exception {
        setup = new LocalSetup();
    }

    public void setInput(String filename) {
        inputFile = filename;
    }

    
    public void execute() throws Exception {
        Path path = new Path(inputFile);
        SequenceFile.Reader reader = 
            new SequenceFile.Reader(setup.getLocalFileSystem(), path, setup.getConf());

        try {
            System.err.println("Key type is " + reader.getKeyClassName());
            System.err.println("Value type is " + reader.getValueClassName());
            if (reader.isCompressed()) {
                System.err.println("Values are compressed.");
            }
            if (reader.isBlockCompressed()) {
                System.err.println("Records are block-compressed.");
            }
            System.err.println("Compression type is " + reader.getCompressionCodec().getClass().getName());
            System.err.println("");

            Writable key = (Writable)(reader.getKeyClass().newInstance());
            while (reader.next(key)) {
                System.out.println(key.toString());
            }
        } finally {
            reader.close();
        }
    }

    public static void main(String[] args) {
        if (args.length != 1) {
            exitWithHelp();
        }

        try {
            SeqKeyList me = new SeqKeyList();
            me.setInput(args[0]);
            me.execute();
        } catch (Exception e) {
            e.printStackTrace();
            exitWithHelp();
        }
    }

    
    public static void exitWithHelp() {
        System.err.println("Usage: SeqKeyList   <sequence-file>\n" +
                           "Prints a list of keys in the sequence file, one per line.");
        System.exit(1);
    }
}


Hadoop : How to read and write a Map file

MapFile
A MapFile is a sorted SequenceFile with an index to permit lookups by key. MapFile can be thought of as a persistent form of java.util.Map (although it doesn’t implement this interface), which is able to grow beyond the size of a Map that is kept in memory.

Writing a MapFile
Writing a MapFile is similar to writing a SequenceFile: you create an instance of MapFile.Writer, then call the append() method to add entries in order. (Attempting to add entries out of order will result in an IOException.) Keys must be instances of WritableComparable, and values must be Writable—contrast this to SequenceFile, which can use any serialization framework for its entries.

Iterating through the entries in order in a MapFile is similar to the procedure for a SequenceFile: you create a MapFile.Reader, then call the next() method until it returns false, signifying that no entry was read because the end of the file was reached

our data sample:
#custId orderNo
965412 S986512
965413 S986513
965414 S986514
965415 S986515
965416 S986516

Now configure your dependencies in pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.rjkrsinghhadoop</groupId>
  <artifactId>MapFileConverter</artifactId>
  <packaging>jar</packaging>
  <version>1.0</version>
  <name>MapFileConverter</name>
  <url>http://maven.apache.org</url>
  <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.7</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging-api</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.0.4</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.2</version>
        </dependency>
    </dependencies>

<!--
    <repositories>
        <repository>
            <id>libdir</id>
            <url>file://${basedir}/lib</url>
        </repository>
    </repositories>
-->

    <build>
        <finalName>exploringhadoop</finalName>
        <plugins>
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-compiler-plugin</artifactId>
                                <configuration>
                                        <source>1.6</source>
                                        <target>1.6</target>
                                </configuration>
                        </plugin>
                        <plugin>
                                <artifactId>maven-assembly-plugin</artifactId>
                                <configuration>
                                        <finalName>${project.name}-${project.version}</finalName>
                                        <appendAssemblyId>true</appendAssemblyId>
                                        <descriptors>
                                                <descriptor>src/main/assembly/assembly.xml</descriptor>
                                        </descriptors>
                                </configuration>
                        </plugin>
        </plugins>
    </build>
</project>

Create MapFileConverter.java which will responsible for converting the text file to the Map file
package com.rjkrsinghhadoop;


import  java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;

public class MapFileConverter {

  @SuppressWarnings("deprecation")
        public static void main(String[] args) throws IOException{
                
                Configuration conf = new Configuration();
                FileSystem fs;
                
                try {
                        fs = FileSystem.get(conf);
                        
                        Path inputFile = new Path(args[0]);
                        Path outputFile = new Path(args[1]);

                      Text txtKey = new Text();
                          Text txtValue = new Text();

                        String strLineInInputFile = "";
                        String lstKeyValuePair[] = null;
                        MapFile.Writer writer = null;
                        
                        FSDataInputStream inputStream = fs.open(inputFile);

                        try {
                                writer = new MapFile.Writer(conf, fs, outputFile.toString(),
                                                txtKey.getClass(), txtKey.getClass());
                                writer.setIndexInterval(1);
                                while (inputStream.available() > 0) {
                                        strLineInInputFile = inputStream.readLine();
                                        lstKeyValuePair = strLineInInputFile.split("\\t");
                                        txtKey.set(lstKeyValuePair[0]);
                                        txtValue.set(lstKeyValuePair[1]);
                                        writer.append(txtKey, txtValue);
                                }
                        } finally {
                                IOUtils.closeStream(writer);
                                System.out.println("Map file created successfully!!");
                  }
        } catch (IOException e) {
                        e.printStackTrace();
                }        
        }
}

to look up a file based on the provided key we will use MapFileReader.

package com.rjkrsinghhadoop;


import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;

public class MapFileReader {

  
        @SuppressWarnings("deprecation")
        public static void main(String[] args) throws IOException {

                Configuration conf = new Configuration();
                FileSystem fs = null;
                Text txtKey = new Text(args[1]);
                Text txtValue = new Text();
                MapFile.Reader reader = null;

                try {
                        fs = FileSystem.get(conf);

                        try {
                                reader = new MapFile.Reader(fs, args[0].toString(), conf);
                                reader.get(txtKey, txtValue);
                        } catch (IOException e) {
                                e.printStackTrace();
                        }

                } catch (IOException e) {
                        e.printStackTrace();
                }
                System.out.println("The value for Key "+ txtKey.toString() +" is "+ txtValue.toString());
        }
}

to ship your code in the jar file we will need an assembly descriptor create a assembly.xml in the resources folder as follows:
<assembly
    xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id>job</id>
    <formats>
        <format>jar</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <dependencySets>
        <dependencySet>
            <unpack>false</unpack>
            <scope>runtime</scope>
            <outputDirectory>lib</outputDirectory>
            <excludes>
                <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
            </excludes>
        </dependencySet>
        <dependencySet>
            <unpack>false</unpack>
            <scope>system</scope>
            <outputDirectory>lib</outputDirectory>
            <excludes>
                <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
            </excludes>
        </dependencySet>
    </dependencySets>
    <fileSets>
        <fileSet>
            <directory>${basedir}/target/classes</directory>
            <outputDirectory>/</outputDirectory>
            <excludes>
                <exclude>*.jar</exclude>
            </excludes>
        </fileSet>
    </fileSets>
</assembly>

now run mvn assembly:assembly which will create a jar file in the target directory, which is ready to be run on your hadoop cluster.

Hadoop : How to read and write Sequence File using mapreduce


Sequence files is a Hadoop specific archive file format similar to tar and zip. The concept behind this is to merge the file set with using a key and a value pair and this created files known as ‘Hadoop Sequence Files’. In this method file name is used as the key and the file content is used as value.

A sequence file consists of a header followed by one or more records. The first three bytes of a sequence file are the bytes SEQ, which acts a magic number, followed by a single byte representing the version number. The header contains other fields including the names of the key and value classes, compression details, user-defined metadata, and the sync marker. Recall that the sync marker is used to allow a reader to synchronize to a record boundary from any position in the file. Each file has a randomly generated sync marker, whose value is stored in the header. Sync markers appear between records in the sequence file. They are designed to incur less than a 1% storage overhead, so they don’t necessarily appear between every pair of records (such is the case for short records).



The internal format of the records depends on whether compression is enabled, and if it is, whether it is record compression or block compression.

If no compression is enabled (the default), then each record is made up of the record length (in bytes), the key length, the key, and then the value. The length fields are written as four-byte integers adhering to the contract of the writeInt() method of java.io.DataOutput. Keys and values are serialized using the Serialization defined for the class being written to the sequence file.

In this sample code I will demonstarate you how to read and write the sequence file. The Complete code is available on my Git repo

we will use the the following sample data:
#custId orderNo
965412 S986512
965413 S986513
965414 S986514
965415 S986515
965416 S986516

configure the hadoop related dependencies in the pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.rjkrsinghhadoop</groupId>
  <artifactId>SequenceFileReaderWriter</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>SequenceFileReaderWriter</name>
  <url>http://maven.apache.org</url>
  <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.7</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging-api</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.0.4</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.2</version>
        </dependency>
    </dependencies>

<!--
    <repositories>
        <repository>
            <id>libdir</id>
            <url>file://${basedir}/lib</url>
        </repository>
    </repositories>
-->

    <build>
        <finalName>exploringhadoop</finalName>
        <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
     <source>1.6</source>
     <target>1.6</target>
    </configuration>
   </plugin>
   <plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
     <finalName>${project.name}-${project.version}</finalName>
     <appendAssemblyId>true</appendAssemblyId>
     <descriptors>
      <descriptor>src/main/assembly/assembly.xml</descriptor>
     </descriptors>
    </configuration>
   </plugin>
        </plugins>
    </build>
</project>

Now create a mapper class to as follows:

package com.rjkrsinghhadoop;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SequenceFileWriterMapper extends Mapper<Text,Text,Text,Text> {
        
        
        @Override
        protected void map(Text key, Text value,Context context)         throws IOException, InterruptedException {
                context.write(key, value);                
        }

}

Create a java class SequenceFileWriterApp which will write a text file to the Sequence file

package com.rjkrsinghhadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SequenceFileWriterApp 
{
    public static void main( String[] args ) throws Exception
    {
            if(args.length !=2 ){
                    System.err.println("Usage : Sequence File Writer Utility <input path> <output path>");
                    System.exit(-1);
            }
            Configuration conf = new Configuration();
            Job job = new Job(conf);
            job.setJarByClass(SequenceFileWriterApp.class);
            job.setJobName("SequenceFileWriter");
            
            FileInputFormat.addInputPath(job,new Path(args[0]) );
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            job.setMapperClass(SequenceFileWriterMapper.class);
            
            job.setInputFormatClass(KeyValueTextInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.setNumReduceTasks(0);
            
            
            System.exit(job.waitForCompletion(true) ? 0:1);
    }
}

To read a sequence file and convert it back to the txt file we need a SequenceFileReader
package com.rjkrsinghhadoop;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SequenceFileReader  {


  public static void main(String[] args) throws Exception {
          if(args.length !=2 ){
                System.err.println("Usage : Sequence File Writer Utility <input path> <output path>");
                System.exit(-1);
        }
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJarByClass(SequenceFileReader.class);
        job.setJobName("SequenceFileReader");
        
        FileInputFormat.addInputPath(job,new Path(args[0]) );
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        job.setMapperClass(SequenceFileWriterMapper.class);
        
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        
        
        System.exit(job.waitForCompletion(true) ? 0:1);
}
}
r

ship your code in the jar file we will need an assembly descriptor create a assembly.xml in the resources folder as follows:
<assembly
    xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id>job</id>
    <formats>
        <format>jar</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <dependencySets>
        <dependencySet>
            <unpack>false</unpack>
            <scope>runtime</scope>
            <outputDirectory>lib</outputDirectory>
            <excludes>
                <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
            </excludes>
        </dependencySet>
        <dependencySet>
            <unpack>false</unpack>
            <scope>system</scope>
            <outputDirectory>lib</outputDirectory>
            <excludes>
                <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
            </excludes>
        </dependencySet>
    </dependencySets>
    <fileSets>
        <fileSet>
            <directory>${basedir}/target/classes</directory>
            <outputDirectory>/</outputDirectory>
            <excludes>
                <exclude>*.jar</exclude>
            </excludes>
        </fileSet>
    </fileSets>
</assembly>

now run mvn assembly:assembly which will create a jar file in the target directory, which is ready to be run on your hadoop cluster.