Monday, August 10, 2015

Writing a parquet file using Hadoop mapreduce job

This is a simple example to write a parquet file using the hadoop mapreduce job.
Env: Java 7,Maven 3.0.1,hadoop1

Step 1: Create a simple java project and add the repository information and dependencies in the pom.xml

<repository>
<id>sonatype-nexus-snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository> <dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-common</artifactId>
<version>1.1.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-encoding</artifactId>
<version>1.1.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-column</artifactId>
<version>1.1.2-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.1.2-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>0.13.0</version>
</dependency>

Step 2: Write a Mapper implementation
package com.test;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.hive.ql.io.parquet.writable.BinaryWritable;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
import parquet.io.api.Binary;
public class ParquetMapper extends Mapper<LongWritable, Text, Void, ArrayWritable>{



@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
DataWritableWriteSupport.getSchema(context.getConfiguration());
}

@Override
public void map(LongWritable n, Text line, Context context) throws IOException, InterruptedException {
if(line!= null && line.getLength() > 0) {
String[] parts = line.toString().split("\t");
Writable[] data = new Writable[2];
for(int i =0; i<2; i++) {
data[i] = new BinaryWritable(Binary.fromString(parts[i]));
}
ArrayWritable aw = new ArrayWritable(Writable.class, data);
context.write(null, aw);
}
}
}

Step 3: finally implement the job driver class as follows
package com.test;
import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static parquet.schema.Type.Repetition.OPTIONAL;
import static parquet.schema.Type.Repetition.REQUIRED;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import parquet.hadoop.ParquetOutputFormat;
import parquet.schema.MessageType;
import parquet.schema.PrimitiveType;

@SuppressWarnings("deprecation")
public class App extends Configured implements Tool {



public int run(String[] args) throws Exception {

Path inpath = new Path(args[0]);
Path outpath = new Path(args[1]);

Configuration conf = getConf();

conf.set(ParquetOutputFormat.BLOCK_SIZE, Integer.toString(128 * 1024 * 1024));
conf.set(ParquetOutputFormat.COMPRESSION, "SNAPPY");

MessageType messagetype = new MessageType("employee",
new PrimitiveType(REQUIRED, BINARY, "empname"),
new PrimitiveType(OPTIONAL, BINARY, "designation"));

DataWritableWriteSupport.setSchema(messagetype,conf);

System.out.println("Schema: " + messagetype.toString());

Job job = new Job(conf, "parquet-convert");

job.setJarByClass(getClass());
job.setJobName(getClass().getName());
job.setMapOutputKeyClass(Void.class);
job.setMapOutputValueClass(ArrayWritable.class);
job.setOutputKeyClass(Void.class);
job.setOutputValueClass(ArrayWritable.class);
job.setMapperClass(ParquetMapper.class);
job.setNumReduceTasks(0);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(ParquetOutputFormat.class);

FileInputFormat.setInputPaths(job, inpath);
ParquetOutputFormat.setOutputPath(job, outpath);

ParquetOutputFormat.setWriteSupportClass(job, DataWritableWriteSupport.class);

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

public static void main( String[] args ) throws Exception
{
int returnCode = ToolRunner.run(new Configuration(), new App(), args);
System.exit(returnCode);
}

}

Now we are good to go, build the project using maven and run your job on hadoop cluster.make sure you provide enough java heap to map task to avoid OOM.

No comments: