In this tutorial I am going to demonstrate you how to import the tab separated file stored on the HDFS to HBase database.
lets start by creating a table on HBase
Step 1 : Create a table in HBase with the name of orders
with coloum family 'ship_to_address','ord_date','ship_date','item','status','price'
create 'orders','ship_to_address','ord_date','ship_date','item','status','price'
Here is our input tsv file stored on hdfs
USA NY New York 28-07-2013 29-07-2013 Toner shipped 200$
USA California San Fransico 29-07-2013 29-07-2013 Cati in process 150$
USA NY Rochester 28-07-2013 28-07-2013 Toner shipped 200$
USA NY Syracuse 21-07-2013 23-07-2013 Paper shipped 80$
USA NY Albany 21-07-2013 21-07-2013 Paper failed 80$
USA California Long Beach 26-07-2013 28-07-2013 Toner shipped 200$
Step 2 : Write your identity Mapper class as fallows:
public class ImportFromTSVMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
@Override
public void map(LongWritable offset, Text line, Context context)
throws IOException {
try {
String lineString = line.toString();
String[] arr = lineString.split("\t");
Put put = new Put(arr[0].getBytes());
put.add("ship_to_address".getBytes(), "country".getBytes(), Bytes.toBytes(arr[1]));
put.add("ship_to_address".getBytes(),"state".getBytes(), Bytes.toBytes(arr[2]));
put.add("ship_to_address".getBytes(),"city".getBytes(), Bytes.toBytes(arr[3]));
put.add("ord_date".getBytes(),"ord_date".getBytes(), Bytes.toBytes(arr[4]));
put.add("ship_date".getBytes(),"ship_date".getBytes(), Bytes.toBytes(arr[5]));
put.add("item".getBytes(),"item".getBytes(), Bytes.toBytes(arr[6]));
put.add("status".getBytes(),"status".getBytes(), Bytes.toBytes(arr[7]));
put.add("price".getBytes(),"price".getBytes(), Bytes.toBytes(arr[8]));
context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Step 3 : Write your job Main class to configure MR job
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ImportTSVFile {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String table = "order";
String input = "/home/rajkrrsingh/mspfeed/ordfeed";
String column = "";
conf.set("conf.column", column);
Job job = new Job(conf, "Import from hdfs to hbase");
job.setJarByClass(ImportTSVFile.class);
job.setMapperClass(ImportFromTSVMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
The result can be verified using HBase console,your tsv file has been imported to the HBase database.