In this tutorial I am going to demonstrate you how to import the tab separated file stored on the HDFS to HBase database.
lets start by creating a table on HBase
Step 1 : Create a table in HBase with the name of orders
with coloum family 'ship_to_address','ord_date','ship_date','item','status','price'
create 'orders','ship_to_address','ord_date','ship_date','item','status','price'
Here is our input tsv file stored on hdfs
USA NY New York 28-07-2013 29-07-2013 Toner shipped 200$
USA California San Fransico 29-07-2013 29-07-2013 Cati in process 150$
USA NY Rochester 28-07-2013 28-07-2013 Toner shipped 200$
USA NY Syracuse 21-07-2013 23-07-2013 Paper shipped 80$
USA NY Albany 21-07-2013 21-07-2013 Paper failed 80$
USA California Long Beach 26-07-2013 28-07-2013 Toner shipped 200$
Step 2 : Write your identity Mapper class as fallows:
public class ImportFromTSVMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { @Override public void map(LongWritable offset, Text line, Context context) throws IOException { try { String lineString = line.toString(); String[] arr = lineString.split("\t"); Put put = new Put(arr[0].getBytes()); put.add("ship_to_address".getBytes(), "country".getBytes(), Bytes.toBytes(arr[1])); put.add("ship_to_address".getBytes(),"state".getBytes(), Bytes.toBytes(arr[2])); put.add("ship_to_address".getBytes(),"city".getBytes(), Bytes.toBytes(arr[3])); put.add("ord_date".getBytes(),"ord_date".getBytes(), Bytes.toBytes(arr[4])); put.add("ship_date".getBytes(),"ship_date".getBytes(), Bytes.toBytes(arr[5])); put.add("item".getBytes(),"item".getBytes(), Bytes.toBytes(arr[6])); put.add("status".getBytes(),"status".getBytes(), Bytes.toBytes(arr[7])); put.add("price".getBytes(),"price".getBytes(), Bytes.toBytes(arr[8])); context.write(new ImmutableBytesWritable(arr[0].getBytes()), put); } catch (Exception e) { e.printStackTrace(); } } }
Step 3 : Write your job Main class to configure MR job
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class ImportTSVFile { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String table = "order"; String input = "/home/rajkrrsingh/mspfeed/ordfeed"; String column = ""; conf.set("conf.column", column); Job job = new Job(conf, "Import from hdfs to hbase"); job.setJarByClass(ImportTSVFile.class); job.setMapperClass(ImportFromTSVMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(input)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
The result can be verified using HBase console,your tsv file has been imported to the HBase database.