Thursday, February 25, 2016

Hadoop MapReduce : Enabling JVM Profiling using -XPROF

The -Xprof profiler is the HotSpot profiler. HotSpot works by running Java code in interpreted mode, while running a profiler in parallel. The HotSpot profiler looks for "hot spots" in the code, i.e. methods that the JVM spends a significant amount of time running, and then compiles those methods into native generated code.
-Xprof is very handy to profile mapreduce code, here are few configuration parameters required to turn on profiling in mapreduce using -Xprof

Friday, February 19, 2016

Running Apache Kafka in Docker Container

Apache Kafka : Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system.
Docker containers : wrap up a piece of software in a complete filesystem that contains everything it needs to run: code, runtime, system tools, system libraries – anything you can install on a server. This guarantees that it will always run the same, regardless of the environment it is running in.
in this quick demo I will make use of a Docker image to run Apache Kafka in a Docker container.

Env: Single node pre-installed with RHEL7
Docker Installation:
yum update
curl -sSL  | sh
service docker start
service docker status // to check service is up

Apache Kafka setup
1. pull the docker image for the zookeeper
docker pull dockerkafka/zookeeper
2. pull docker image for kafka
docker pull dockerkafka/zookeeper

3. Run zookeeper docker container in detach mode
docker run -d --name zookeeper -p 2181:2181 dockerkafka/zookeeper

4. Run docker container for Kafka
docker run --name kafka -p 9092:9092 --link zookeeper:zookeeper dockerkafka/kafka  &

5. get host ip where zk is running
docker inspect --format '{{ .NetworkSettings.IPAddress }}' zookeeper

6. get host ip where kafka is running
docker inspect --format '{{ .NetworkSettings.IPAddress }}' kafka

7. in the next course of action lets create a topic for that start interactive shell
docker exec -t -i exec kafka bash --create --topic vmstat_logs --zookeeper --replication-factor 1 --partitions 1

8. start a kafka producer which will publish vmstat output of every one second to the broker
vmstat 1 | --topic vmstat_logs --broker-list

9. open a different shell and start a consumer
docker exec -t -i kafka bash --topic vmstat_logs --from-beginning --zookeeper

the consumer will start running and consume the vmstat logs from the producer.

Thursday, February 18, 2016

creating fat jar (uber jar) using maven shade plugin

Add the following build configuration in pom.xml and simple run mvn clean package and it will build you a fat jar including all the dependencies into the jar.


Apache Flume : Data ingestion from Kafka to HDFS

flume configuration to setup kafka as source and HDFS as sink.

tier1.channels = kafkachannel
tier1.sink = hdfssink

tier1.channels.kafkachannel.type =
tier1.channels.kafkachannel.brokerList = kafkabroker-1:9092,kafkabroker-2:9092
tier1.channels.kafkachannel.topic = logs
tier1.channels.kafkachannel.zookeeperConnect = kafkabroker-1:2181
tier1.channels.kafkachannel.parseAsFlumeEvent = false

tier1.sinks.hdfssink.type = hdfs
tier1.sinks.hdfssink.hdfs.path = /tmp/logs
tier1.sinks.hdfssink.hdfs.rollinterval = 5
tier1.sinks.hdfssink.hdfs.fileType = DataStream =  kafkachannel

Setup 2 node Apache Kafka cluster on Mac-OSX

This is a quick start guide to setup 2 node (broker) cluster on the Mac-OSX.
Steps to install:
1. Download apache kafka from the
2. extract to some folder in my example i have created in /tmp folder
3. Create kafka logs directory
mkdir /tmp/kafka-logs-1
mkdir /tmp/kafka-logs-2
4. copy config/ to config/ as I will be running the two broker on the same machine so the following property needs to be updated in
edit the accordingly
5. now start the zookeeper with
bin/ config/
6. Start kafka broker
bin/ config/ &

7. Start second kafka broker
bin/ config/ &

8. create kafka topic
bin/ --zookeeper localhost:2181 --create --topic general_topic --partitions 2 --replication-factor 2

9. list topic
bin/ --zookeeper localhost:2181 --describe --topic general_topic

10. now its time to test kafka setup, for that setup producer and consumers
bin/ --broker-list  localhost:9092 --topic general_topic

bin/ --zookeeper localhost:2181 --topic general_topic
bin/ --zookeeper localhost:2181 --topic general_topic --from-beginning

Wednesday, February 17, 2016

Hadoop Simple Authentication using java

Following is the sample java program to take the advantage of Hadoop UserGroupInfomation to perform a simple authentication by impersonating a user.

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;

public class SimpleSecurity {

    public static void main(String args[]) {

        try {
            UserGroupInformation ugi
                    = UserGroupInformation.createRemoteUser("rsingh");

            ugi.doAs(new PrivilegedExceptionAction<Void>() {

                public Void run() throws Exception {

                    Configuration conf = new Configuration();
                    conf.set("fs.defaultFS", "maprfs:///");
                    conf.set("hadoop.job.ugi", "rsingh");

                    FileSystem fs = FileSystem.get(conf);

                    fs.createNewFile(new Path("/user/rsingh/test"));

                    FileStatus[] status = fs.listStatus(new Path("/user/rsingh"));

                    for(int i=0;i<status.length;i++){



                    return null;

        } catch (Exception e) {

Compile and Run this code
javac -cp `hadoop classpath` 
ava -cp .:`hadoop classpath` SimpleSecurity
16/02/17 03:08:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/17 03:08:18 INFO fs.MapRFileSystem: User root is impersonating user rsingh

Monday, February 15, 2016

Oozie : Running a jobflow using java api

This is sample program to run the map-reduce job flow from oozie java client Api.copy the example directory shipped with the oozie installation on the dfs and create a sample program as follows:

import java.util.Properties;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;

public class OozieWFJavaApi{

public static void main(String args[]){
 OozieClient wc = new OozieClient("http://ip-10-0-0-233:11000/oozie");

    Properties conf = wc.createConfiguration();

    conf.setProperty(OozieClient.APP_PATH, "maprfs:///user/mapr/examples/apps/map-reduce/workflow.xml");
    conf.setProperty("jobTracker", "maprfs:///");
    conf.setProperty("nameNode", "maprfs:///");
    conf.setProperty("queueName", "default");
    conf.setProperty("oozie.use.system.libpath", "true");
    conf.setProperty("", "true");
    try {
        String jobId =;
        System.out.println("job, " + jobId + " submitted");

        while (wc.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) {
            System.out.println("Workflow job in Running State");
        System.out.println("WF job Completed");
    } catch (Exception r) {
        System.out.println("Job submission failed with exception " + r.getLocalizedMessage());

Compile and run using oozie client api
[mapr@ip-10-0-0-233 tmp]$ javac -cp .:/opt/mapr/oozie/oozie-4.2.0/lib/oozie-client-4.2.0-mapr-1510.jar 
[mapr@ip-10-0-0-233 tmp]$ java -cp .:/opt/mapr/oozie/oozie-4.2.0/lib/* OozieWFJavaApi
job, 0000007-160215031423760-oozie-mapr-W submitted
Workflow job in Running State
Workflow job in Running State
Workflow job in Running State
Workflow job in Running State
WF job Completed
Workflow id[0000007-160215031423760-oozie-mapr-W] status[SUCCEEDED]

Saturday, February 6, 2016

Shared Memory : Simple C program to demonstrate setting up Share memory

shared memory is memory that may be simultaneously accessed by multiple programs with an intent to provide communication among them or avoid redundant copies. Shared memory is an efficient means of passing data between programs.let see how it
actually work using simple c programs.lets create two process which will use shared memory

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/shm.h>

int main(int argc,char *argv[]){

int shmid;
key_t key;
char *shm;
char *memloc;
key = 9876;

shmid = shmget(key,100,IPC_CREAT | 0666);
if(shmid <0){
    printf("unable to get shared memory area identifier");

shm = shmat(shmid,NULL,0) ;
if (shm == (char *)-1)
    printf("map/unmap shared memory");

memcpy(shm,"this is shared content",22);
memloc = shm;
memloc +=22;

*memloc = 0;

while(*shm != '!')
    sleep (1);

return 0;

let create an another program which will read from the shared memory created by the process1.

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/shm.h>

int main(int argc,char *argv[]){

int shmid;
key_t key;
char *shm;
char *mloc;
key = 9876;

shmid = shmget(key,100,IPC_CREAT | 0666);
if(shmid <0){
    printf("unable to get shared memory area identifier");

shm = shmat(shmid,NULL,0) ;
if (shm == (char *)-1)
    printf("map/unmap shared memory");

for(mloc = shm; *mloc !=0;mloc++)
    printf ("%c\n",*mloc);

*shm = '!';
return 0;
gcc -o process1 process1.c 
gcc -o process2 process2.c 
./process1 -- will create and write to the shared memory and will be running
from another shell
./process2 -- will write the content from shared memory to the console and set content to the shared memory which terminate the process1.

ipcs -m
IPC status from  as of Sat Feb  6 13:24:00 IST 2016
T     ID     KEY        MODE       OWNER    GROUP
Shared Memory:
m  65536 0x00002694 --rw-rw-rw-   rsingh    staff

ipcrm -M 9876 // 9876 is hex equivalent of the shared memory key.