Sunday, March 20, 2016

Apache Flink is a streaming data flow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. this is a sample application to consume output of vmstat command as a stream, so lets get hands dirty
mkdir  flink-steaming-example
cd flink-steaming-example/
mkdir -p src/main/scala
cd src/main/scala/
vim FlinkStreamingExample.scala

import org.apache.flink.streaming.api.scala._
object FlinkStreamingExample{
    def main(args:Array[String]){
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val socketVmStatStream = env.socketTextStream("ip-10-0-0-233",9000);
        socketVmStatStream.print
        env.execute()
    }    
}

cd -
vim build.sbt
name := "flink-streaming-examples"

version := "1.0"
scalaVersion := "2.10.4"

libraryDependencies ++= Seq("org.apache.flink" %% "flink-scala" % "1.0.0",
  "org.apache.flink" %% "flink-clients" % "1.0.0",
    "org.apache.flink" %% "flink-streaming-scala" % "1.0.0")

sbt clean package

stream some test data
vmstat 1 | nc -l 9000

now submit flink job
bin/flink run /root/flink-steaming-example/target/scala-2.10/flink-streaming-examples_2.10-1.0.jar 
03/20/2016 08:04:12 Job execution switched to status RUNNING.
03/20/2016 08:04:12 Source: Socket Stream -> Sink: Unnamed(1/1) switched to SCHEDULED 
03/20/2016 08:04:12 Source: Socket Stream -> Sink: Unnamed(1/1) switched to DEPLOYING 
03/20/2016 08:04:12 Source: Socket Stream -> Sink: Unnamed(1/1) switched to RUNNING 

let see output of the job
tailf flink-1.0.0/log/flink-root-jobmanager-0-ip-10-0-0-233.out -- will see the vmstat output

Thursday, February 25, 2016

Hadoop MapReduce : Enabling JVM Profiling using -XPROF

The -Xprof profiler is the HotSpot profiler. HotSpot works by running Java code in interpreted mode, while running a profiler in parallel. The HotSpot profiler looks for "hot spots" in the code, i.e. methods that the JVM spends a significant amount of time running, and then compiles those methods into native generated code.
-Xprof is very handy to profile mapreduce code, here are few configuration parameters required to turn on profiling in mapreduce using -Xprof
mapreduce.task.profile='true'
mapreduce.task.profile.maps='0-'
mapreduce.task.profile.reduces='0-'
mapreduce.task.profile.params='-Xprof'

Friday, February 19, 2016

Running Apache Kafka in Docker Container

Apache Kafka : Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system.
Docker containers : wrap up a piece of software in a complete filesystem that contains everything it needs to run: code, runtime, system tools, system libraries – anything you can install on a server. This guarantees that it will always run the same, regardless of the environment it is running in.
in this quick demo I will make use of a Docker image to run Apache Kafka in a Docker container.

Env: Single node pre-installed with RHEL7
Docker Installation:
yum update
curl -sSL https://get.docker.com/  | sh
service docker start
service docker status // to check service is up

Apache Kafka setup
1. pull the docker image for the zookeeper
docker pull dockerkafka/zookeeper
2. pull docker image for kafka
docker pull dockerkafka/zookeeper

3. Run zookeeper docker container in detach mode
docker run -d --name zookeeper -p 2181:2181 dockerkafka/zookeeper

4. Run docker container for Kafka
docker run --name kafka -p 9092:9092 --link zookeeper:zookeeper dockerkafka/kafka  &

5. get host ip where zk is running
docker inspect --format '{{ .NetworkSettings.IPAddress }}' zookeeper
172.17.0.2

6. get host ip where kafka is running
docker inspect --format '{{ .NetworkSettings.IPAddress }}' kafka

7. in the next course of action lets create a topic for that start interactive shell
docker exec -t -i exec kafka bash
kafka-topics.sh --create --topic vmstat_logs --zookeeper 172.17.0.2:2181 --replication-factor 1 --partitions 1

8. start a kafka producer which will publish vmstat output of every one second to the broker
vmstat 1 | kafka-console-producer.sh --topic vmstat_logs --broker-list 172.17.0.3:9092

9. open a different shell and start a consumer
docker exec -t -i kafka bash
kafka-console-consumer.sh --topic vmstat_logs --from-beginning --zookeeper 172.17.0.2:2181

the consumer will start running and consume the vmstat logs from the producer.

Thursday, February 18, 2016

creating fat jar (uber jar) using maven shade plugin

Add the following build configuration in pom.xml and simple run mvn clean package and it will build you a fat jar including all the dependencies into the jar.

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <finalName>uber-${artifactId}-${version}</finalName>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

Apache Flume : Data ingestion from Kafka to HDFS

flume configuration to setup kafka as source and HDFS as sink.

tier1.channels = kafkachannel
tier1.sink = hdfssink

tier1.channels.kafkachannel.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.kafkachannel.brokerList = kafkabroker-1:9092,kafkabroker-2:9092
tier1.channels.kafkachannel.topic = logs
tier1.channels.kafkachannel.zookeeperConnect = kafkabroker-1:2181
tier1.channels.kafkachannel.parseAsFlumeEvent = false

tier1.sinks.hdfssink.type = hdfs
tier1.sinks.hdfssink.hdfs.path = /tmp/logs
tier1.sinks.hdfssink.hdfs.rollinterval = 5
tier1.sinks.hdfssink.hdfs.fileType = DataStream
tier1.sinks.hdfssink.channel =  kafkachannel