Sunday, November 27, 2016
Sunday, July 17, 2016
Sunday, March 20, 2016
Apache Flink is a streaming data flow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. this is a sample application to consume output of vmstat command as a stream, so lets get hands dirty
mkdir flink-steaming-example cd flink-steaming-example/ mkdir -p src/main/scala cd src/main/scala/ vim FlinkStreamingExample.scala import org.apache.flink.streaming.api.scala._ object FlinkStreamingExample{ def main(args:Array[String]){ val env = StreamExecutionEnvironment.getExecutionEnvironment val socketVmStatStream = env.socketTextStream("ip-10-0-0-233",9000); socketVmStatStream.print env.execute() } } cd - vim build.sbt name := "flink-streaming-examples" version := "1.0" scalaVersion := "2.10.4" libraryDependencies ++= Seq("org.apache.flink" %% "flink-scala" % "1.0.0", "org.apache.flink" %% "flink-clients" % "1.0.0", "org.apache.flink" %% "flink-streaming-scala" % "1.0.0") sbt clean package stream some test data vmstat 1 | nc -l 9000 now submit flink job bin/flink run /root/flink-steaming-example/target/scala-2.10/flink-streaming-examples_2.10-1.0.jar 03/20/2016 08:04:12 Job execution switched to status RUNNING. 03/20/2016 08:04:12 Source: Socket Stream -> Sink: Unnamed(1/1) switched to SCHEDULED 03/20/2016 08:04:12 Source: Socket Stream -> Sink: Unnamed(1/1) switched to DEPLOYING 03/20/2016 08:04:12 Source: Socket Stream -> Sink: Unnamed(1/1) switched to RUNNING let see output of the job tailf flink-1.0.0/log/flink-root-jobmanager-0-ip-10-0-0-233.out -- will see the vmstat output
Thursday, February 25, 2016
Hadoop MapReduce : Enabling JVM Profiling using -XPROF
The -Xprof profiler is the HotSpot profiler. HotSpot works by running Java code in interpreted mode, while running a profiler in parallel. The HotSpot profiler looks for "hot spots" in the code, i.e. methods that the JVM spends a significant amount of time running, and then compiles those methods into native generated code.
-Xprof is very handy to profile mapreduce code, here are few configuration parameters required to turn on profiling in mapreduce using -Xprof
-Xprof is very handy to profile mapreduce code, here are few configuration parameters required to turn on profiling in mapreduce using -Xprof
mapreduce.task.profile='true' mapreduce.task.profile.maps='0-' mapreduce.task.profile.reduces='0-' mapreduce.task.profile.params='-Xprof'
Friday, February 19, 2016
Running Apache Kafka in Docker Container
Apache Kafka : Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system.
Docker containers : wrap up a piece of software in a complete filesystem that contains everything it needs to run: code, runtime, system tools, system libraries – anything you can install on a server. This guarantees that it will always run the same, regardless of the environment it is running in.
in this quick demo I will make use of a Docker image to run Apache Kafka in a Docker container.
Env: Single node pre-installed with RHEL7
Docker Installation:
Apache Kafka setup
1. pull the docker image for the zookeeper
3. Run zookeeper docker container in detach mode
4. Run docker container for Kafka
5. get host ip where zk is running
6. get host ip where kafka is running
7. in the next course of action lets create a topic for that start interactive shell
8. start a kafka producer which will publish vmstat output of every one second to the broker
9. open a different shell and start a consumer
the consumer will start running and consume the vmstat logs from the producer.
Docker containers : wrap up a piece of software in a complete filesystem that contains everything it needs to run: code, runtime, system tools, system libraries – anything you can install on a server. This guarantees that it will always run the same, regardless of the environment it is running in.
in this quick demo I will make use of a Docker image to run Apache Kafka in a Docker container.
Env: Single node pre-installed with RHEL7
Docker Installation:
yum update curl -sSL https://get.docker.com/ | sh service docker start service docker status // to check service is up
Apache Kafka setup
1. pull the docker image for the zookeeper
docker pull dockerkafka/zookeeper2. pull docker image for kafka
docker pull dockerkafka/zookeeper
3. Run zookeeper docker container in detach mode
docker run -d --name zookeeper -p 2181:2181 dockerkafka/zookeeper
4. Run docker container for Kafka
docker run --name kafka -p 9092:9092 --link zookeeper:zookeeper dockerkafka/kafka &
5. get host ip where zk is running
docker inspect --format '{{ .NetworkSettings.IPAddress }}' zookeeper 172.17.0.2
6. get host ip where kafka is running
docker inspect --format '{{ .NetworkSettings.IPAddress }}' kafka
7. in the next course of action lets create a topic for that start interactive shell
docker exec -t -i exec kafka bash kafka-topics.sh --create --topic vmstat_logs --zookeeper 172.17.0.2:2181 --replication-factor 1 --partitions 1
8. start a kafka producer which will publish vmstat output of every one second to the broker
vmstat 1 | kafka-console-producer.sh --topic vmstat_logs --broker-list 172.17.0.3:9092
9. open a different shell and start a consumer
docker exec -t -i kafka bash kafka-console-consumer.sh --topic vmstat_logs --from-beginning --zookeeper 172.17.0.2:2181
the consumer will start running and consume the vmstat logs from the producer.
Thursday, February 18, 2016
creating fat jar (uber jar) using maven shade plugin
Add the following build configuration in pom.xml and simple run mvn clean package and it will build you a fat jar including all the dependencies into the jar.
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> <configuration> <finalName>uber-${artifactId}-${version}</finalName> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build>
Apache Flume : Data ingestion from Kafka to HDFS
flume configuration to setup kafka as source and HDFS as sink.
tier1.channels = kafkachannel tier1.sink = hdfssink tier1.channels.kafkachannel.type = org.apache.flume.channel.kafka.KafkaChannel tier1.channels.kafkachannel.brokerList = kafkabroker-1:9092,kafkabroker-2:9092 tier1.channels.kafkachannel.topic = logs tier1.channels.kafkachannel.zookeeperConnect = kafkabroker-1:2181 tier1.channels.kafkachannel.parseAsFlumeEvent = false tier1.sinks.hdfssink.type = hdfs tier1.sinks.hdfssink.hdfs.path = /tmp/logs tier1.sinks.hdfssink.hdfs.rollinterval = 5 tier1.sinks.hdfssink.hdfs.fileType = DataStream tier1.sinks.hdfssink.channel = kafkachannel
Setup 2 node Apache Kafka cluster on Mac-OSX
This is a quick start guide to setup 2 node (broker) cluster on the Mac-OSX.
Steps to install:
1. Download apache kafka from the http://kafka.apache.org/downloads.html
2. extract to some folder in my example i have created in /tmp folder
3. Create kafka logs directory
7. Start second kafka broker
8. create kafka topic
9. list topic
10. now its time to test kafka setup, for that setup producer and consumers
Steps to install:
1. Download apache kafka from the http://kafka.apache.org/downloads.html
2. extract to some folder in my example i have created in /tmp folder
3. Create kafka logs directory
mkdir /tmp/kafka-logs-1 mkdir /tmp/kafka-logs-24. copy config/server.properties to config/server2.properties as I will be running the two broker on the same machine so the following property needs to be updated in server.properties
broker.id=0 port=9092 log.dirs=/tmp/kafka-logs-1edit the server2.properties accordingly
broker.id=1 port=9091 log.dirs=/tmp/kafka-logs-25. now start the zookeeper with
bin/zookeeper-server-start.sh config/zookeeper.properties6. Start kafka broker
bin/kafka-server-start.sh config/server.properties &
7. Start second kafka broker
bin/kafka-server-start.sh config/server2.properties &
8. create kafka topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic general_topic --partitions 2 --replication-factor 2
9. list topic
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic general_topic
10. now its time to test kafka setup, for that setup producer and consumers
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic general_topic bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic general_topic bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic general_topic --from-beginning
Wednesday, February 17, 2016
Hadoop Simple Authentication using java
Following is the sample java program to take the advantage of Hadoop UserGroupInfomation to perform a simple authentication by impersonating a user.
Compile and Run this code
import org.apache.hadoop.conf.*; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; public class SimpleSecurity { public static void main(String args[]) { try { UserGroupInformation ugi = UserGroupInformation.createRemoteUser("rsingh"); ugi.doAs(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "maprfs:///"); conf.set("hadoop.job.ugi", "rsingh"); FileSystem fs = FileSystem.get(conf); fs.createNewFile(new Path("/user/rsingh/test")); FileStatus[] status = fs.listStatus(new Path("/user/rsingh")); for(int i=0;i<status.length;i++){ System.out.println(status[i].getPath()); } return null; } }); } catch (Exception e) { e.printStackTrace(); } } }
Compile and Run this code
javac -cp `hadoop classpath` SimpleSecurity.java ava -cp .:`hadoop classpath` SimpleSecurity 16/02/17 03:08:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/02/17 03:08:18 INFO fs.MapRFileSystem: User root is impersonating user rsingh maprfs:///user/rsingh/test
Monday, February 15, 2016
Oozie : Running a jobflow using java api
This is sample program to run the map-reduce job flow from oozie java client Api.copy the example directory shipped with the oozie installation on the dfs and create a sample program as follows:
import java.util.Properties; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowJob; public class OozieWFJavaApi{ public static void main(String args[]){ OozieClient wc = new OozieClient("http://ip-10-0-0-233:11000/oozie"); Properties conf = wc.createConfiguration(); conf.setProperty(OozieClient.APP_PATH, "maprfs:///user/mapr/examples/apps/map-reduce/workflow.xml"); conf.setProperty("jobTracker", "maprfs:///"); conf.setProperty("nameNode", "maprfs:///"); conf.setProperty("queueName", "default"); conf.setProperty("oozie.use.system.libpath", "true"); conf.setProperty("oozie.wf.rerun.failnodes", "true"); conf.setProperty("outputDir","map-reduce"); try { String jobId = wc.run(conf); System.out.println("job, " + jobId + " submitted"); while (wc.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) { System.out.println("Workflow job in Running State"); Thread.sleep(1000); } System.out.println("WF job Completed"); System.out.println(wc.getJobInfo(jobId)); } catch (Exception r) { System.out.println("Job submission failed with exception " + r.getLocalizedMessage()); } } }Compile and run using oozie client api
[mapr@ip-10-0-0-233 tmp]$ javac -cp .:/opt/mapr/oozie/oozie-4.2.0/lib/oozie-client-4.2.0-mapr-1510.jar OozieWFJavaApi.java [mapr@ip-10-0-0-233 tmp]$ java -cp .:/opt/mapr/oozie/oozie-4.2.0/lib/* OozieWFJavaApi job, 0000007-160215031423760-oozie-mapr-W submitted Workflow job in Running State Workflow job in Running State Workflow job in Running State Workflow job in Running State WF job Completed Workflow id[0000007-160215031423760-oozie-mapr-W] status[SUCCEEDED]
Saturday, February 6, 2016
Shared Memory : Simple C program to demonstrate setting up Share memory
shared memory is memory that may be simultaneously accessed by multiple programs with an intent to provide communication among them or avoid redundant copies. Shared memory is an efficient means of passing data between programs.let see how it
actually work using simple c programs.lets create two process which will use shared memory
actually work using simple c programs.lets create two process which will use shared memory
//process1.c #include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <string.h> #include <stdlib.h> #include <sys/ipc.h> #include <sys/shm.h> int main(int argc,char *argv[]){ int shmid; key_t key; char *shm; char *memloc; key = 9876; shmid = shmget(key,100,IPC_CREAT | 0666); if(shmid <0){ printf("unable to get shared memory area identifier"); exit(1); } shm = shmat(shmid,NULL,0) ; if (shm == (char *)-1) { printf("map/unmap shared memory"); exit(1); } memcpy(shm,"this is shared content",22); memloc = shm; memloc +=22; *memloc = 0; while(*shm != '!') sleep (1); return 0; }let create an another program which will read from the shared memory created by the process1.
#include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <string.h> #include <stdlib.h> #include <sys/ipc.h> #include <sys/shm.h> int main(int argc,char *argv[]){ int shmid; key_t key; char *shm; char *mloc; key = 9876; shmid = shmget(key,100,IPC_CREAT | 0666); if(shmid <0){ printf("unable to get shared memory area identifier"); exit(1); } shm = shmat(shmid,NULL,0) ; if (shm == (char *)-1) { printf("map/unmap shared memory"); exit(1); } for(mloc = shm; *mloc !=0;mloc++) printf ("%c\n",*mloc); *shm = '!'; return 0; }Compilation:
gcc -o process1 process1.c gcc -o process2 process2.cRun:
./process1 -- will create and write to the shared memory and will be running from another shell ./process2 -- will write the content from shared memory to the console and set content to the shared memory which terminate the process1. -- ipcs -m IPC status fromas of Sat Feb 6 13:24:00 IST 2016 T ID KEY MODE OWNER GROUP Shared Memory: m 65536 0x00002694 --rw-rw-rw- rsingh staff ipcrm -M 9876 // 9876 is hex equivalent of the shared memory key.
Saturday, January 9, 2016
BTrace : Tracing a java Application
BTrace is a helpful tool that lets you run Java-like scripts on top of a live JVM to capture or aggregate any form of variable state without restarting the JVM or deploying new code. This enables you to do pretty powerful things like printing the stack traces of threads, writing to a specific file, or printing the number of items of any queue or connection pool and many more.
lets trace a simple java application to know how much time a method took to complete at runtime without modifying a code.
Installation:
BTrace require a tracing script which has annotated action about what to trace and can add additional capabilities.this is the user guide to know about the different annotations and what they signifies.we have a following tracing script which will trace the execute method and print its execution time. to compile this script you need to have brace-tool.jar in the class path.
now it time to trace the already running java application, to know the process id of the application use jps command which will fetch you the process id.attach btrace to process id as follows and it will start tracing the application
lets trace a simple java application to know how much time a method took to complete at runtime without modifying a code.
Installation:
-Download latest version of BTrace from https://kenai.com/projects/btrace/downloads/directory/releases and unzip it some location. -Add the env variable as follows export BTRACE_HOME=/root/btrace-bin PATH=$PATH:$BTRACE_HOME/binThis is the sample code which I want to trace
package com.rajkrrsingh.trace.test; public class BtraceTest { public void execute (int i) { System.out.println ("executing "+i+" time"); try { Thread.sleep (Math.abs (( int ) (Math.random () * 1000 ))); } catch (InterruptedException e) { e.printStackTrace (); } } public void doExcute () { int i=0; while ( true ) { execute (i); i++; } } public static void main (String [] args) { BtraceTest btraceTest = new BtraceTest (); btraceTest.doExcute (); } }
BTrace require a tracing script which has annotated action about what to trace and can add additional capabilities.this is the user guide to know about the different annotations and what they signifies.we have a following tracing script which will trace the execute method and print its execution time. to compile this script you need to have brace-tool.jar in the class path.
package com.rajkrrsingh.trace.test; import com.sun.btrace.annotations.BTrace; import static com.sun.btrace.BTraceUtils.jstack; import static com.sun.btrace.BTraceUtils.println; import static com.sun.btrace.BTraceUtils.str; import static com.sun.btrace.BTraceUtils.strcat; import static com.sun.btrace.BTraceUtils.timeMillis; import com.sun.btrace.annotations.BTrace; import com.sun.btrace.annotations.Kind; import com.sun.btrace.annotations.Location; import com.sun.btrace.annotations.OnMethod; import com.sun.btrace.annotations.TLS; @BTrace public class TraceMethodTime { @TLS static Long beginTime; @OnMethod(clazz="com.rajkrrsingh.trace.test.BtraceTest", method="execute") public static void traceExecuteBegin() { println("method Start!"); beginTime = timeMillis(); } @OnMethod(clazz="com.rajkrrsingh.trace.test.BtraceTest", method="execute", location=@Location(Kind.RETURN)) public static void traceExcute() { println(strcat(strcat("btrace.test.MyBtraceTest.execute time is: ", str(timeMillis() - beginTime)), "ms")); println("method End!"); jstack(); } }
now it time to trace the already running java application, to know the process id of the application use jps command which will fetch you the process id.attach btrace to process id as follows and it will start tracing the application
btrace process-id script-location btrace 22618 com/rajkrrsingh/trace/test/TraceMethodTime.javaBtrace will attach to the running jam and you will see the following output on the console.
method Start! btrace.test.MyBtraceTest.execute time is: 323ms method End! com.rajkrrsingh.trace.test.BtraceTest.execute(BtraceTest.java:12) com.rajkrrsingh.trace.test.BtraceTest.doExcute(BtraceTest.java:18) com.rajkrrsingh.trace.test.BtraceTest.main(BtraceTest.java:25) method Start! btrace.test.MyBtraceTest.execute time is: 655ms method End! com.rajkrrsingh.trace.test.BtraceTest.execute(BtraceTest.java:12) com.rajkrrsingh.trace.test.BtraceTest.doExcute(BtraceTest.java:18) com.rajkrrsingh.trace.test.BtraceTest.main(BtraceTest.java:25)
Thursday, January 7, 2016
SparkThrift Server Configuration in yarn mode
Env: MapR,Spark 1.4.1
Start spark thrift server on mapr cluster as follows (-- start thrift server non root user)
if there is no error exception in the logs try connecting using the beeline ship with the spark distribution
Start spark thrift server on mapr cluster as follows (-- start thrift server non root user)
/opt/mapr/spark/spark-1.4.1/sbin/start-thriftserver.sh --master yarn --hiveconf hive.server2.thrift.bind.host `hostname` --hiveconf hive.server2.trift.port 10000check for any error exception in the logs(in my case it is tailf /opt/mapr/spark/spark-1.4.1/logs/spark-mapr-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-ip-10-0-0-233.out)
if there is no error exception in the logs try connecting using the beeline ship with the spark distribution
/opt/mapr/spark/spark-1.4.1/bin/beeline Beeline version 1.4.1 by Apache Hive beeline> !connect jdbc:hive2://10.0.0.233:10000/default;auth=noSasl scan complete in 1ms Connecting to jdbc:hive2://10.0.0.233:10000/default;auth=noSasl Enter username for jdbc:hive2://10.0.0.233:10000/default;auth=noSasl: Enter password for jdbc:hive2://10.0.0.233:10000/default;auth=noSasl: Connected to: Spark SQL (version 1.4.1) Driver: Spark Project Core (version 1.4.1) Transaction isolation: TRANSACTION_REPEATABLE_READ 0: jdbc:hive2://10.0.0.233:10000/default> show tables; +------------------------------+--------------+ | tableName | isTemporary | +------------------------------+--------------+ | b1 | false | | b2 | false | | bob | false | | ct | false |
Subscribe to:
Posts (Atom)