Monday, October 21, 2013

Hadoop : Merging Small tar files to the Sequence File

The Hadoop Distributed File System (HDFS) is a distributed file system. It is mainly designed for batch processing of large volume of data. The default block size of HDFS is 64MB. When data is represented in files significantly smaller than the default block size the performance degrades dramatically. Mainly there are two reasons for producing small files. One reason is some files are pieces of a larger logical file (e.g. - log files). Since HDFS has only recently supported appends, these unbounded files are saved by writing them in chunks into HDFS. Other reason is some files cannot be combined together into one larger file and are essentially small. e.g. - A large corpus of images where each image is a distinct file.

Solution to the small files by merging them into a Sequence File:

Sequence files is a Hadoop specific archive file format similar to tar and zip. The concept behind this is to merge the file set with using a key and a value pair and this created files known as ‘Hadoop Sequence Files’. In this method file name is used as the key and the file content is used as value.

In the proposed solution we will demonstrate how to write small files to the Sequence File and a Sequence file reader which will list the file name in Sequence File:

Setting up a local file system:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

public class LocalSetup {

    private FileSystem fileSystem;
    private Configuration config;

    
    public LocalSetup() throws Exception {
        config = new Configuration();

        
        config.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");

        fileSystem = FileSystem.get(config);
        if (fileSystem.getConf() == null) {
                throw new Exception("LocalFileSystem configuration is null");
        }
    }

    
    public Configuration getConf() {
        return config;
    }

    
    public FileSystem getLocalFileSystem() {
        return fileSystem;
    }
}

In the next course of action we will setup a class which will read from the .tar.gz,.tgz,.tar.bz2 extension files and write it to the Sequence File with key as the name of file and value be the content of the file:
import org.apache.tools.bzip2.CBZip2InputStream;
import org.apache.tools.tar.TarEntry;
import org.apache.tools.tar.TarInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;



public class TarToSeqFile {

    private File inputFile;
    private File outputFile;
    private LocalSetup setup;

    
    public TarToSeqFile() throws Exception {
        setup = new LocalSetup();
    }

    
    public void setInput(File inputFile) {
        this.inputFile = inputFile;
    }

    public void setOutput(File outputFile) {
        this.outputFile = outputFile;
    }

    public void execute() throws Exception {
        TarInputStream input = null;
        SequenceFile.Writer output = null;
        try {
            input = openInputFile();
            output = openOutputFile();
            TarEntry entry;
            while ((entry = input.getNextEntry()) != null) {
                if (entry.isDirectory()) { continue; }
                String filename = entry.getName();
                byte[] data = TarToSeqFile.getBytes(input, entry.getSize());
                
                Text key = new Text(filename);
                BytesWritable value = new BytesWritable(data);
                output.append(key, value);
            }
        } finally {
            if (input != null) { input.close(); }
            if (output != null) { output.close(); }
        }
    }

    private TarInputStream openInputFile() throws Exception {
        InputStream fileStream = new FileInputStream(inputFile);
        String name = inputFile.getName();
        InputStream theStream = null;
        if (name.endsWith(".tar.gz") || name.endsWith(".tgz")) {
            theStream = new GZIPInputStream(fileStream);
        } else if (name.endsWith(".tar.bz2") || name.endsWith(".tbz2")) {
            fileStream.skip(2);
            theStream = new CBZip2InputStream(fileStream);
        } else {
            theStream = fileStream;
        }
        return new TarInputStream(theStream);
    }

    private SequenceFile.Writer openOutputFile() throws Exception {
        Path outputPath = new Path(outputFile.getAbsolutePath());
        return SequenceFile.createWriter(setup.getLocalFileSystem(), setup.getConf(),
                                         outputPath,
                                         Text.class, BytesWritable.class,
                                         SequenceFile.CompressionType.BLOCK);
    }

    
    private static byte[] getBytes(TarInputStream input, long size) throws Exception {
        if (size > Integer.MAX_VALUE) {
            throw new Exception("A file in the tar archive is too large.");
        }
        int length = (int)size;
        byte[] bytes = new byte[length];

        int offset = 0;
        int numRead = 0;

        while (offset < bytes.length &&
               (numRead = input.read(bytes, offset, bytes.length - offset)) >= 0) {
            offset += numRead;
        }

        if (offset < bytes.length) {
            throw new IOException("A file in the tar archive could not be completely read.");
        }

        return bytes;
    }

    
    public static void main(String[] args) {
        if (args.length != 2) {
            exitWithHelp();
        }

        try {
            TarToSeqFile me = new TarToSeqFile();
            me.setInput(new File(args[0]));
            me.setOutput(new File(args[1]));
            me.execute();
        } catch (Exception e) {
            e.printStackTrace();
            exitWithHelp();
        }
    }

    public static void exitWithHelp() {
        System.err.println("Usage:  <tarfile> TarToSeqFile  <output>\n\n" +
                           "<tarfile> may be GZIP or BZIP2 compressed, must have a\n" +
                           "recognizable extension .tar, .tar.gz, .tgz, .tar.bz2, or .tbz2.");
        System.exit(1);
    }
}

In this way we can write files to a single Sequence file, to test it further we will read from the Sequence file and list the keys of the file as output
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;


public class SeqKeyList {

    private String inputFile;
    private LocalSetup setup;

    public SeqKeyList() throws Exception {
        setup = new LocalSetup();
    }

    public void setInput(String filename) {
        inputFile = filename;
    }

    
    public void execute() throws Exception {
        Path path = new Path(inputFile);
        SequenceFile.Reader reader = 
            new SequenceFile.Reader(setup.getLocalFileSystem(), path, setup.getConf());

        try {
            System.err.println("Key type is " + reader.getKeyClassName());
            System.err.println("Value type is " + reader.getValueClassName());
            if (reader.isCompressed()) {
                System.err.println("Values are compressed.");
            }
            if (reader.isBlockCompressed()) {
                System.err.println("Records are block-compressed.");
            }
            System.err.println("Compression type is " + reader.getCompressionCodec().getClass().getName());
            System.err.println("");

            Writable key = (Writable)(reader.getKeyClass().newInstance());
            while (reader.next(key)) {
                System.out.println(key.toString());
            }
        } finally {
            reader.close();
        }
    }

    public static void main(String[] args) {
        if (args.length != 1) {
            exitWithHelp();
        }

        try {
            SeqKeyList me = new SeqKeyList();
            me.setInput(args[0]);
            me.execute();
        } catch (Exception e) {
            e.printStackTrace();
            exitWithHelp();
        }
    }

    
    public static void exitWithHelp() {
        System.err.println("Usage: SeqKeyList   <sequence-file>\n" +
                           "Prints a list of keys in the sequence file, one per line.");
        System.exit(1);
    }
}