Monday, August 26, 2013

Import TSV file from HFDS to HBase using Identity Mapper


In this tutorial I am going to demonstrate you how to import the tab separated file stored on the HDFS to HBase database.

lets start by creating a table on HBase

Step 1 : Create a table in HBase with the name of orders

with coloum family 'ship_to_address','ord_date','ship_date','item','status','price'
create 'orders','ship_to_address','ord_date','ship_date','item','status','price'

Here is our input tsv file stored on hdfs

USA NY New York 28-07-2013 29-07-2013 Toner shipped 200$
USA California San Fransico 29-07-2013 29-07-2013 Cati in process 150$
USA NY Rochester 28-07-2013 28-07-2013 Toner shipped 200$
USA NY Syracuse 21-07-2013 23-07-2013 Paper shipped 80$
USA NY Albany 21-07-2013 21-07-2013 Paper failed 80$
USA California Long Beach 26-07-2013 28-07-2013 Toner shipped 200$

Step 2 : Write your identity Mapper class as fallows:
public class ImportFromTSVMapper extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
    
        @Override
        public void map(LongWritable offset, Text line, Context context)
                throws IOException {
            try {
                String lineString = line.toString();
                String[] arr = lineString.split("\t");
                Put put = new Put(arr[0].getBytes());
                put.add("ship_to_address".getBytes(), "country".getBytes(), Bytes.toBytes(arr[1]));
                put.add("ship_to_address".getBytes(),"state".getBytes(), Bytes.toBytes(arr[2]));
                put.add("ship_to_address".getBytes(),"city".getBytes(), Bytes.toBytes(arr[3]));
                put.add("ord_date".getBytes(),"ord_date".getBytes(), Bytes.toBytes(arr[4]));
                put.add("ship_date".getBytes(),"ship_date".getBytes(), Bytes.toBytes(arr[5]));
                put.add("item".getBytes(),"item".getBytes(), Bytes.toBytes(arr[6]));
                put.add("status".getBytes(),"status".getBytes(), Bytes.toBytes(arr[7]));
                put.add("price".getBytes(),"price".getBytes(), Bytes.toBytes(arr[8]));
                       context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

Step 3 : Write your job Main class to configure MR job
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ImportTSVFile {
    
    
   public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        String table = "order";
        String input = "/home/rajkrrsingh/mspfeed/ordfeed";
        String column = "";

        conf.set("conf.column", column);
        Job job = new Job(conf, "Import from hdfs to hbase");
        job.setJarByClass(ImportTSVFile.class);
        job.setMapperClass(ImportFromTSVMapper.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Writable.class);
        job.setNumReduceTasks(0);
        FileInputFormat.addInputPath(job, new Path(input));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}


The result can be verified using HBase console,your tsv file has been imported to the HBase database.