Friday, December 4, 2015

Google protobuf installation on Mac

$wget https://github.com/google/protobuf/releases/download/v2.5.0/protobuf-2.5.0.tar.bz2
$tar xvf protobuf-2.5.0.tar.bz2
$cd protobuf-2.5.0
$./configure CC=clang CXX=clang++ CXXFLAGS='-std=c++11 -stdlib=libc++ -O3 -g' LDFLAGS='-stdlib=libc++' LIBS="-lc++ -lc++abi"
$make -j 4 
$sudo make install
$protoc --version

Sunday, November 29, 2015

Amazon EMR : Creating a Spark Cluster and Running a Job

Amazon Elastic MapReduce (EMR) is an Amazon Web Service (AWS) for data processing and analysis. Amazon EMR offers the expandable low-configuration service as an easier alternative to running in-house cluster computing.
In this example lets spin a spark cluster and run a spark job which crunch the apache logs and filter out the error logs only.

Prerequisites:
AWS Account
install and configure the AWS CLI tool
create default roles

Spark Job
follow these steps to create a sample jobs
mkdir SampleSparkApp
cd SampleSparkApp
mkdir -p src/main/scala
cd src/main/scala
vim SimpleApp.scala

package com.example.project

/**
 * @author rsingh
 */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "s3://rks-clus-data/log.txt" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val  errors = logData.filter(line => line.contains("error"))
    errors.saveAsTextFile("s3://rks-clus-data/error-log.txt") 
  }
}

cd -
vim build.sbt

name := "Spark Log Job"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.5.0","org.apache.spark" %% "spark-streaming" % "1.5.0")

* now build the project using sbt
sbt package

the jar will be available after successful build target/scala-2.10/spark-log-job_2.10-1.0.jar
upload job jar to the s3 bucket
aws s3 cp target/scala-2.10/spark-log-job_2.10-1.0.jar s3://rks-clus-data/job-jars/

upload sample logs at your s3 bucket
aws s3 cp log.txt s3://rks-clus-data/

create job steps as follows
cat step.json
[
{
"Name": "SampleSparkApp",
"Type":"CUSTOM_JAR",
"Jar":"command-runner.jar",
"Args":
[
"spark-submit",
"--deploy-mode", "cluster",
"--class", "com.example.project.SimpleApp",
"s3://rks-clus-data/job-jars/spark-count-job_2.10-1.0.jar",
"s3://rks-clus-data/log.txt",
"s3://rks-clus-data"
],
"ActionOnFailure": "TERMINATE_CLUSTER"
}
]

now Spin a Amazon EMR cluster with auto terminate option
    aws emr create-cluster \
    --name "Single Node Spark Cluster" \
    --instance-type m3.xlarge \
    --release-label emr-4.2.0 \
    --instance-count 1 \
    --use-default-roles \
    --applications Name=Spark \
    --steps file://step.json \
    --auto-terminate

The above command will spin a spark cluster on EMR and run a job.it will terminate automatically irrespective of success or failure.

Tuesday, November 24, 2015

Apache Storm Sample Application

These are baby steps to build and run the storm application.
pre requisite : java 7, maven
create default maven archetype
$  mvn archetype:generate 
$  cd StormSampleApp/
$  update pom.xml as follow https://github.com/rajkrrsingh/StormSampleApp/blob/master/pom.xml
$  create a driver class composed of Spout definition, Bolt definitions and topology configuration as follows
   https://github.com/rajkrrsingh/StormSampleApp/blob/master/src/main/java/com/rajkrrsingh/storm/SampleTopology.java
$  mvn package
$  storm jar target/StormSampleApp-1.0-SNAPSHOT-jar-with-dependencies.jar com.rajkrrsingh.storm.SampleTopology

Saturday, November 21, 2015

Apache Drill : Run query through Microsoft Excel

In this short demo I will demonstrate you how to setup Drill with Microsoft Excel and execute the query.I this example I am using Microsoft Excel 2011 edition.
1.Download Apache drill from https://drill.apache.org/download/
2.Extract at some location and start drill into embedded mode using $DRILL_HOME/bin/drill-embedded
3.Download and configure MapR Drill ODBC driver according to your env https://drill.apache.org/docs/installing-the-driver-on-mac-os-x/
4.update the ODBC datasource configuration as follows
ConnectionType=Direct
HOST=localhost
PORT=31010
ZKQuorum=
ZKClusterID=
5. open MS Excel, go to data-> get external data-> new databse query -> select drill datasource
6. click test query to run your query.

Friday, November 20, 2015

Apache Drill : Saving query output to csv, parquet and json output

Quick way to save query result in different format (json,parquet,csv) depending upon the needs
0: jdbc:drill:zk=> alter session set `store.format`='csv';
+-------+------------------------+
|  ok   |        summary         |
+-------+------------------------+
| true  | store.format updated.  |
+-------+------------------------+
1 row selected (0.13 seconds)
0: jdbc:drill:zk=> create table dfs.tmp.`emp_csv` as select * from cp.`default`.`employee.json`;
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 0_0       | 1155                       |
+-----------+----------------------------+
1 row selected (0.444 seconds)
0: jdbc:drill:zk=> alter session set `store.format`='parquet';
+-------+------------------------+
|  ok   |        summary         |
+-------+------------------------+
| true  | store.format updated.  |
+-------+------------------------+
1 row selected (0.133 seconds)
0: jdbc:drill:zk=> create table dfs.tmp.`emp_parquet` as select * from cp.`default`.`employee.json`;
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 0_0       | 1155                       |
+-----------+----------------------------+
1 row selected (0.826 seconds)
0: jdbc:drill:zk=> alter session set `store.format`='json';
+-------+------------------------+
|  ok   |        summary         |
+-------+------------------------+
| true  | store.format updated.  |
+-------+------------------------+
1 row selected (0.116 seconds)
0: jdbc:drill:zk=> create table dfs.tmp.`emp_json` as select * from cp.`default`.`employee.json`;
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 0_0       | 1155                       |
+-----------+----------------------------+
1 row selected (0.322 seconds)
0: jdbc:drill:zk=> 

//output
root@RKS ~]# hadoop fs -ls /tmp/emp*
Found 1 items
-rwxr-xr-x   3 mapr mapr     175674 2015-11-21 01:58 /tmp/emp_csv/0_0_0.csv
Found 1 items
-rwxr-xr-x   3 mapr mapr     563564 2015-11-21 02:02 /tmp/emp_json/0_0_0.json
Found 1 items
-rwxr-xr-x   3 mapr mapr      52179 2015-11-21 02:01 /tmp/emp_parquet/0_0_0.parquet

Wednesday, November 18, 2015

Sqoop Import Eroor due to 'must contain '$CONDITIONS' in WHERE clause'

Sqoop import as parquet files fail with the following exceptions
$sqoop import --connect jdbc:mysql://127.0.0.1:3306/mysql --username root  --query 'select * from user where  $CONDITION'  --target-dir  /user/mapr/import-dir --as-parquetfile  -m 1

ERROR tool.ImportTool: Encountered IOException running import job: java.io.IOException: Query [select * from user where 1=1 $CONDITION] must contain '$CONDITIONS' in WHERE clause
at org.apache.sqoop.manager.ConnManager.getColumnTypes(ConnManager.java:300)
 at org.apache.sqoop.orm.ClassWriter.getColumnTypes(ClassWriter.java:1833)
ERROR tool.ImportTool: Encountered IOException running import job: java.io.IOException: No columns to generate for ClassWriter
 at org.apache.sqoop.orm.ClassWriter.generate(ClassWriter.java:1651)
 at org.apache.sqoop.tool.CodeGenTool.generateORM(CodeGenTool.java:107)
 at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:478)
The right way to do it is
sqoop import --connect jdbc:mysql://127.0.0.1:3306/metastore --username root  --query "select * from TBLS where \$CONDITIONS"   --target-dir  /user/mapr/import_dir --as-parquetfile  -m 1

Tuesday, November 17, 2015

load csv data to hive partitioned tables

1.Create an External partitioned Table to some location

CREATE EXTERNAL TABLE `csvtable`( 
`col1` string COMMENT , 
`col2` string COMMENT , 
`col3` string COMMENT , 
`col4` string COMMENT ) 
PARTITIONED BY ( 
`year` string) 
ROW FORMAT SERDE 
'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
STORED AS INPUTFORMAT 
'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
LOCATION 
'/mycsvtable' 

2.create a directory under it with the name of your partition
hadoop fs -mkdir /mycsvtable/year=2001

3.copy some csv files to the directory
hadoop fs -put sample.csv /mycsvtable/year=2001

4. hive still does not know about this partition as metastore has not been updated yet, if you try to query
csvtable it will fetch you no result.to resolve this issue alter the table so that metastore updated accordingly
alter table csvtable add partition (year='2001')

Now you are good to Go.

Runnig Zeppelin notebook on MapR cluster

Zeppelin, a web-based notebook that enables interactive data analytics. You can make beautiful data-driven, interactive and collaborative documents with SQL, Scala and more.these are the minimal steps to get it done on MapR-5.0 cluster
insure java is installed
yum install git
yum install npm

# install maven
wget http://www.eu.apache.org/dist/maven/maven-3/3.3.3/binaries/apache-maven-3.3.3-bin.tar.gz
sudo tar -zxf apache-maven-3.3.3-bin.tar.gz -C /usr/local/
sudo ln -s /usr/local/apache-maven-3.3.3/bin/mvn /usr/local/bin/mvn

git clone https://github.com/apache/incubator-zeppelin.git
cd incubator-zeppelin/
mvn clean package -Pmapr50 -DskipTests

# update conf/zeppelin-env.sh
export SPARK_HOME=
export HADOOP_HOME=

#optional configuration
conf/zeppelin-env.sh
conf/zeppelin-site.xml

#Run
bin/zeppelin-daemon.sh start
open a web-browser and browse http://hostname:8080/ 

#To Stop
bin/zeppelin-daemon.sh stop

Tuesday, November 10, 2015

Hive metastore upgrade manually

Env: hive 12 running derby in embedded mode upgraded to hive-1.0

download derbytools-10.4.2.0.jar
--dump current schema
execute java -cp derbytools-10.4.2.0.jar:$HIVE_HOME/lib/derby-10.4.2.0.jar org.apache.derby.tools.dblook -d 'jdbc:derby:metastore_db' > schema-12.sql
(--metastore_db is relative location)

upgrade hive to hive 1.0 but dont start it.
copy following files from the $HIVE_HOME/scripts/metastore/upgrade/derby/
upgrade-0.12.0-to-0.13.0.derby.sql
018-HIVE-6757.derby.sql
017-HIVE-6458.derby.sql
016-HIVE-6386.derby.sql
hive-txn-schema-0.13.0.derby.sql
019-HIVE-7784.derby.sql
upgrade-0.13.0-to-0.14.0.derby.sql


download derbytools-10.10.1.1.jar
execute java -cp derbytools-10.10.1.1.jar:derby-10.10.1.1.jar org.apache.derby.tools.ij
connect 'jdbc:derby:metastore_db'
run 'upgrade-0.12.0-to-0.13.0.derby.sql';
run 'upgrade-0.13.0-to-0.14.0.derby.sql';

now start hive, you can see my old tables and also query them without any issue.

Thursday, November 5, 2015

Building Spark Notebook for MapR 5.0

Env : MapR 5.0
Spark : 1.4.1

git clone https://github.com/andypetrella/spark-notebook.git

// add mapr repo to sbt proxy repo
vim ~/.sbt/repositories
[repositories]
local

maven-central

typesafe: http://repo.typesafe.com/typesafe/releases/

typesafe-snapshots: http://repo.typesafe.com/typesafe/snapshots/

mapr: http://repository.mapr.com/maven

cd spark-notebook
sbt -Dspark.version=1.4.1 -Dhadoop.version=2.7.0-mapr-1506 -Dwith.hive=true -Dwith.parquet=true clean dist

add env variables

$ export HADOOP_CONF_DIR=/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop
$ export EXTRA_CLASSPATH=/opt/mapr/lib/commons-configuration-1.6.jar:/opt/mapr/lib/hadoop-auth-2.7.0.jar:/opt/mapr/lib/maprfs-5.0.0-mapr.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/zookeeper-3.4.5-mapr-1503.jar

copy spark assembly jar to the /apps/spark
hadoop fs -put /opt/mapr/spark/spark-1.4.1/lib/spark-assembly-1.4.1-hadoop2.5.1-mapr-1501.jar /apps/spark/spark-assembly.jar


now start notebook using sbt run
[info] play - Listening for HTTP on /0:0:0:0:0:0:0:0:9000

(Server started, use Ctrl+D to stop and go back to the console...)
the message indicate that everything went smooth and notebook has started on port 9000. to access notebook open a browser and type address http://:9000 and u are good to go

Friday, October 23, 2015

Spark Streaming Sample program using scala

mkdir spark-streaming-example
cd spark-streaming-example/
mkdir -p src/main/scala
cd src/main/scala
vim TestStreaming.scala

add following line of code to TestStreaming.scala

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf

object TestStreaming{
        def main(args:Array[String]){
                val conf = new SparkConf().setAppName("Streaming Test")
                val ssc = new StreamingContext(conf,Seconds(30))
                val lines = ssc.socketTextStream("hostname",6002)
                val errorLines = lines.filter(_.contains("error"))
                lines.print
                errorLines.print()
                //errorLines.saveAsTextFiles("errorline.txt")
                println("starting streaming context")
                ssc.start()
                println("awaiting termination")
                ssc.awaitTermination()
                println("App done")
        }
}

cd -
vim build.sbt
name := "Spark Streaming Example"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.4.1","org.apache.spark" %% "spark-streaming" % "1.4.1")

*now run sbt package from project home and it will build a jar inside target/scala-2.10/spark-streaming-example_2.10-1.0.jar

* run this jar using spark-submit
#bin/spark-submit --class TestStreaming target/scala-2.10/spark-streaming-example_2.10-1.0.jar

to test this program open a different terminal and run nc -lk `hostname` 6002 hit enter and type anything on console while will display on the spark console.

Saprk-Sql : How to query a csv file.

$/spark-1.4.1/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.1.0
scala> sqlContext
res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@12c57a43

scala> val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "file:///root/emp.csv","header"->"true"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [emp_id: string, emp_name: string, country: string, salary: string]

scala>   df.printSchema()
root
 |-- emp_id: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- salary: string (nullable = true)


scala> df.registerTempTable("emp")

scala> val names = sqlContext.sql("select emp_name from emp")
names: org.apache.spark.sql.DataFrame = [emp_name: string]

scala> names.foreach(println)
[Guy]
[Jonas]
[Hector]
[Byron]
[Owen]
[Zachery]
[Alden]
[Akeem]

Thursday, October 22, 2015

Spark-SQL : how to query json files

scala> sqlContext
res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@330305d3

scala> val df = sqlContext.load("org.apache.spark.sql.json", Map("path" -> "file:///employee.json"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [birth_date: string, department_id: bigint, education_level: string, employee_id: bigint, end_date: string, first_name: string, full_name: string, gender: string, hire_date: string, last_name: string, management_role: string, marital_status: string, position_id: bigint, position_title: string, salary: double, store_id: bigint, supervisor_id: bigint]

scala>  df.printSchema()
root
 |-- birth_date: string (nullable = true)
 |-- department_id: long (nullable = true)
 |-- education_level: string (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- end_date: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- management_role: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- position_id: long (nullable = true)
 |-- position_title: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- store_id: long (nullable = true)
 |-- supervisor_id: long (nullable = true)


scala> df.registerTempTable("employee")
scala> val names = sqlContext.sql("select first_name from employee limit 5")
names: org.apache.spark.sql.DataFrame = [first_name: string]
scala> names.foreach(println)
[Sheri]
[Derrick]
[Michael]
[Maya]
[Roberta]


Monday, October 19, 2015

Apache Drill -JDBC Storage Plugin Configuration

To query mysql datasource from apache drill, create a new plugin after accessing the Drill-UI with the following configuration
{
  "type": "jdbc",
  "driver": "com.mysql.jdbc.Driver",
  "url": "jdbc:mysql://hostname:3306/dbname",
  "username": "root",
  "password": "password",
  "enabled": true
}
thats it now you all set to query mysql from apache drill
select * from plugin_name.table_name;

Sunday, October 11, 2015

Apache Spark : Setup Eclipse (using maven) to Build Spark-Scala project

1. Create maven scala quick start project
mvn archetype:generate -B  -DarchetypeGroupId=pl.org.miki -DarchetypeArtifactId=scala-quickstart-archetype -DarchetypeVersion=0.8.2  -DgroupId=com.example -DartifactId=spark-project -Dversion=1.0 -Dpackage=com.example.project  -DsourceFolders=scala-only


2. Import spark-project into Eclipse using the maven project import wizard.
3. update pom.xml as follows
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.example</groupId>
<artifactId>spark-project</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.7</java.version>
<scala.version>2.11.2</scala.version>
<scala.version.tools>2.11</scala.version.tools>
</properties>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<!-- this is so we don't end with a compile error in maven-compiler-plugin -->     
<phase>process-sources</phase>      
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.version.tools}</artifactId>
<version>1.11.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version.tools}</artifactId>
<version>2.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
</dependencies>
</project>

4. Add the sample scala program to src/main/scala(I have taken the sample from apache spark website)
package com.example.project

/**
* @author rsingh
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
def main(args: Array[String]) {
val logFile = "/root/test.txt" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}

5.Now build your project using maven package command
mvn clean package

6.the above command will create a fat jar in the target directory,run the target jar using the spark-submit
$bin/spark-submit --class com.example.project.SimpleApp --master spark://maprdemo:7077 /root/spark-project-1.0-jar-with-dependencies.jar

Friday, October 2, 2015

Apache Spark : JDBC connectivity with MySQL

ENV: Apache Spark-1.4.1 with spark-shell
scala> def getConnection(){
     | Class.forName("com.mysql.jdbc.Driver").newInstance();
     | DriverManager.getConnection("jdbc:mysql://localhost:3306/test?user=root");
     | }
scala> def convertRS(rs:ResutlSet){
     | (rs.getInt(1),rs.getString(2))
     | }
scala> val result = new JdbcRDD(sc,getConnection,"SELECT * FROM emp WHERE ? > empid AND empid <= ?",0,20,2,maprow=convertRS)
scala> result.collect().toList

Apache Spark : Reading and Writing Sequence Files

Writing a Sequence file:
scala> val data = sc.parallelize(List(("key1", 1), ("Kay2", 2), ("Key3", 2)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at :27
scala> data.saveAsSequenceFile("/tmp/seq-output")

The output can be verified using hadoop ls command
[root@maprdemo sample-data]# hadoop fs -lsr /tmp/seq-output
lsr: DEPRECATED: Please use 'ls -R' instead.
-rwxr-xr-x   1 root root          0 2015-10-02 01:12 /tmp/seq-output/_SUCCESS
-rw-r--r--   1 root root        102 2015-10-02 01:12 /tmp/seq-output/part-00000
-rw-r--r--   1 root root        119 2015-10-02 01:12 /tmp/seq-output/part-00001
[root@maprdemo sample-data]# hadoop fs -text /tmp/seq-output/part-00001
Kay2 2
Key3 2

Reading Sequence file

scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.IntWritable
val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())}
scala> val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())}
result: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at :29

scala> result.collect
res14: Array[(String, Int)] = Array((Kay2,2), (Key3,2))

Thursday, October 1, 2015

Apache Spark and HBase Integration on MapR Distribution

Env: Spark-1.4.1,HBase 0.98.12 on MapR-5.0

Problems: you can see lots of problem like this
*java.lang.RuntimeException: Error occurred while instantiating com.mapr.fs.hbase.MapRTableMappingRules
*java.lang.ClassNotFoundException: org.apache.hadoop.hbase.client.mapr.BaseTableMappingRules
*error: object HBaseAdmin is not a member of package org.apache.hadoop.hbase.client
*error: object HBaseConfiguration is not a member of package org.apache.hadoop.hbase

to resolve these issue and submit a job from spark-shell you need to change few things.

1. edit spark-defaults.conf and add spark.executor.extraClassPath as follows

spark.executor.extraClassPath /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar

2. start the spark shell with the --driver-class-path as follows
bin/spark-shell --driver-class-path /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar



Utility to create lots of parquet files using hive

Create table in hive
CREATE TABLE partition_table_par(id INT, username string)
 PARTITIONED BY(year STRING, month STRING,day STRING,eventtype STRING,varfunction STRING,varname STRING)
 STORED AS PARQUET;

Bash Script to pump the data into the table which will store it in the parquet files
#!/bin/bash

for i in {1..100}
do
echo $i
year=`expr $i + 1996`
yy=\'$year\'
echo $yy
month=`expr $i % 12`
mm=\'$month\'
day=`expr $i % 30 `
dd=\'$day\'
eventtype=eventtype$i
et=\'$eventtype\'
varfunction=varfunction$i
vf=\'$varfunction\'
varname=varname$i
v=\'$varname\'


hive -e 'insert into table partition_table_par PARTITION(year='$yy',month='$mm',day='$dd',eventtype='$et',varfunction='$vf',varname='$v') select 1,'$yy' from test_table limit 1;'
#sleep 1m
done

Tuesday, September 8, 2015

Java Program to use JPam Authentication

JPam is a Java-PAM bridge. PAM, or Pluggable Authentication Modules, is a standard security architecture used on Linux, Mac OS X, Solaris, HP-UX and other Unix systems. JPam is the missing link between the two.

JPAM permits the use of PAM authentication facilities by Java applications running on those platforms.

These facilities include:

account
auth
password
session

In this example I will demonstrate how to use Jpam in your java application.
1. Download the JPam from http://sourceforge.net/projects/jpam/files/
2. Extract the tar on the disk
3. change directory to the extracted folder and copy the net-sf-jpam file to the /etc/pam.d
4. Create a sample java program-make sure you have JPam-1.1.jar in the classpath

import java.util.Arrays;
import java.util.Arrays;
import java.util.List;

import net.sf.jpam.Pam;


public class PamAuthenticator {
public static void main(String[] args) {

authenticate(args[0],args[1],args[2]);
}

public static void authenticate(String user, String password,String profile) {
Pam pam = new Pam(profile);
System.out.println("Starting auth with username "+user+" passwd "+password+" for profile "+profile);
if (!pam.authenticateSuccessful(user, password)) {
throw new RuntimeException(String.format("PAM profile '%s' validation failed", profile));
}
System.out.println("done with "+profile);
}
}

5. compile and run the test class as follows
java -cp .:* -Djava.library.path=`pwd` PamAuthenticator mapr mapr login

output:
Starting auth with username mapr passwd mapr for profile login
done with login

Wednesday, August 12, 2015

Apache Zookeeper : monitoring the zookeeper using four letters commands

here are few 4 words commands you can use with zookeeper to know the stats and status of zookeeper

[root@ip-10-0-0-251 ~]# 
[root@ip-10-0-0-251 ~]# echo ruok | nc `hostname` 5181
imok[root@ip-10-0-0-251 ~]# 
[root@ip-10-0-0-251 ~]# echo stat | nc `hostname` 5181
Zookeeper version: 3.4.5-mapr-1406--1, built on 01/19/2015 22:57 GMT
Clients:
 /10.0.0.251:57652[0](queued=0,recved=1,sent=0)
 /10.0.0.251:57623[1](queued=0,recved=9151,sent=9158)
 /10.0.0.251:57643[1](queued=0,recved=9120,sent=9125)
 /10.0.0.251:57622[1](queued=0,recved=9156,sent=9164)
 /10.0.0.251:57624[1](queued=0,recved=12974,sent=12974)
 /10.0.0.251:57620[1](queued=0,recved=9150,sent=9157)
 /10.0.0.251:57644[1](queued=0,recved=7708,sent=7966)
 /10.0.0.251:57626[1](queued=0,recved=9195,sent=9206)
 /10.0.0.251:57619[1](queued=0,recved=9185,sent=9186)
 /10.0.0.251:57621[1](queued=0,recved=9143,sent=9149)

Latency min/avg/max: 0/0/63
Received: 85043
Sent: 85345
Connections: 10
Outstanding: 0
Zxid: 0x136
Mode: standalone
Node count: 59
[root@ip-10-0-0-251 ~]# 
[root@ip-10-0-0-251 ~]# 
[root@ip-10-0-0-251 ~]# echo conf | nc `hostname` 5181
clientPort=5181
dataDir=/opt/mapr/zkdata/version-2
dataLogDir=/opt/mapr/zkdata/version-2
tickTime=2000
maxClientCnxns=100
minSessionTimeout=4000
maxSessionTimeout=40000
serverId=0
[root@ip-10-0-0-251 ~]# echo cons | nc `hostname` 5181
 /10.0.0.251:57623[1](queued=0,recved=9162,sent=9169,sid=0x14f1efc29d70004,lop=PING,est=1439333921242,to=30000,lcxid=0x1d,lzxid=0x136,lresp=1439425306327,llat=0,minlat=0,avglat=0,maxlat=2)
 /10.0.0.251:57643[1](queued=0,recved=9131,sent=9136,sid=0x14f1efc29d70008,lop=PING,est=1439334130760,to=30000,lcxid=0x13,lzxid=0x136,lresp=1439425306958,llat=0,minlat=0,avglat=0,maxlat=2)
 /10.0.0.251:57654[0](queued=0,recved=1,sent=0)
 /10.0.0.251:57622[1](queued=0,recved=9166,sent=9174,sid=0x14f1efc29d70003,lop=PING,est=1439333921221,to=30000,lcxid=0x22,lzxid=0x136,lresp=1439425302064,llat=0,minlat=0,avglat=0,maxlat=5)
 /10.0.0.251:57624[1](queued=0,recved=12989,sent=12989,sid=0x14f1efc29d70005,lop=GETC,est=1439333936044,to=30000,lcxid=0x1aef,lzxid=0x136,lresp=1439425303437,llat=0,minlat=0,avglat=0,maxlat=6)
 /10.0.0.251:57620[1](queued=0,recved=9161,sent=9168,sid=0x14f1efc29d70001,lop=PING,est=1439333921154,to=30000,lcxid=0x1d,lzxid=0x136,lresp=1439425307676,llat=0,minlat=0,avglat=0,maxlat=4)
 /10.0.0.251:57644[1](queued=0,recved=7716,sent=7974,sid=0x14f1efc29d70009,lop=PING,est=1439334142657,to=40000,lcxid=0x394,lzxid=0x136,lresp=1439425303832,llat=0,minlat=0,avglat=0,maxlat=16)
 /10.0.0.251:57626[1](queued=0,recved=9206,sent=9217,sid=0x14f1efc29d70007,lop=PING,est=1439333966309,to=30000,lcxid=0x4f,lzxid=0x136,lresp=1439425306675,llat=0,minlat=0,avglat=0,maxlat=6)
 /10.0.0.251:57619[1](queued=0,recved=9196,sent=9197,sid=0x14f1efc29d70000,lop=PING,est=1439333920963,to=30000,lcxid=0x41,lzxid=0x136,lresp=1439425304945,llat=0,minlat=0,avglat=0,maxlat=63)
 /10.0.0.251:57621[1](queued=0,recved=9153,sent=9159,sid=0x14f1efc29d70002,lop=PING,est=1439333921194,to=30000,lcxid=0x15,lzxid=0x136,lresp=1439425302145,llat=0,minlat=0,avglat=0,maxlat=6)

[root@ip-10-0-0-251 ~]# 
[root@ip-10-0-0-251 ~]# 
[root@ip-10-0-0-251 ~]# echo envi | nc `hostname` 5181
Environment:
zookeeper.version=3.4.5-mapr-1406--1, built on 01/19/2015 22:57 GMT
host.name=ip-10-0-0-251
java.version=1.7.0_45
java.vendor=Oracle Corporation
java.home=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.45.x86_64/jre
java.class.path=/opt/mapr/zookeeper/zookeeper-3.4.5/bin/../build/classes:/opt/mapr/zookeeper/zookeeper-3.4.5/bin/../build/lib/*.jar:/opt/mapr/zookeeper/zookeeper-3.4.5/bin/../lib/slf4j-log4j12-1.6.1.jar:/opt/mapr/zookeeper/zookeeper-3.4.5/bin/../lib/slf4j-api-1.6.1.jar:/opt/mapr/zookeeper/zookeeper-3.4.5/bin/../lib/netty-3.2.2.Final.jar:/opt/mapr/zookeeper/zookeeper-3.4.5/bin/../lib/log4j-1.2.15.jar:/opt/mapr/zookeeper/zookeeper-3.4.5/bin/../lib/jline-0.9.94.jar:/opt/mapr/zookeeper/zookeeper-3.4.5/bin/../zookeeper-3.4.5-mapr-1406.jar:/opt/mapr/zookeeper/zookeeper-3.4.5/bin/../src/java/lib/*.jar:/opt/mapr/zookeeper/zookeeper-3.4.5/conf::/opt/mapr/lib/maprfs-4.0.2-mapr.jar:/opt/mapr/lib/protobuf-java-2.5.0.jar:/opt/mapr/lib/libprotodefs-4.0.2-mapr.jar:/opt/mapr/lib/baseutils-4.0.2-mapr.jar:/opt/mapr/lib/json-20080701.jar:/opt/mapr/lib/flexjson-2.1.jar:/opt/mapr/lib/commons-codec-1.5.jar
java.library.path=/opt/mapr/lib
java.io.tmpdir=/tmp
java.compiler=
os.name=Linux
os.arch=amd64
os.version=2.6.32-431.el6.x86_64
user.name=mapr
user.home=/home/mapr
user.dir=/
[root@ip-10-0-0-251 ~]# 
[root@ip-10-0-0-251 ~]# 
[root@ip-10-0-0-251 ~]# echo wchc | nc `hostname` 5181
0x14f1efc29d70004
 /services/kvstore/ip-10-0-0-251
 /services/hoststats/ip-10-0-0-251
 /services/hoststats/master
 /nodes/ip-10-0-0-251/services/hoststats
0x14f1efc29d70008
 /nodes/ip-10-0-0-251/services/drill-bits
 /services/drill-bits/ip-10-0-0-251
 /services/drill-bits/master
0x14f1efc29d70003
 /nodes/ip-10-0-0-251/services/cldb
 /services/kvstore/ip-10-0-0-251
 /services/cldb/ip-10-0-0-251
 /datacenter/controlnodes/cldb/active/CLDBRunningMaster
 /services/cldb/master
0x14f1efc29d70005
 /datacenter/controlnodes/cldb/active/CLDBMaster
0x14f1efc29d70009
 /drill/sys.storage_plugins/hive
 /drill/sys.storage_plugins/cp
 /drill/sys.storage_plugins/mongo
 /drill/MyDrillCluster-402-drillbits/9d62d3ba-f9fa-4ad4-9b9c-684b854d41fe
 /drill/sys.storage_plugins/hbase
 /drill/sys.storage_plugins/dfs
0x14f1efc29d70001
 /services/webserver/master
 /nodes/ip-10-0-0-251/services/webserver
 /services/webserver/ip-10-0-0-251
 /services/cldb/master
0x14f1efc29d70007
 /services/drill-bits/master
 /services_config/kvstore/ip-10-0-0-251
 /services/kvstore/ip-10-0-0-251
 /services/services/drill-bits
 /services/services/webserver
 /services_config/drill-bits/ip-10-0-0-251
 /services/cldb/master
 /services/webserver/master
 /services/drill-bits/ip-10-0-0-251
 /services_config/cldb/ip-10-0-0-251
 /datacenter/controlnodes/cldb/active/CLDBMaster
 /services/services/cldb
 /services/services/kvstore
 /services/hoststats/ip-10-0-0-251
 /services/services/hoststats
 /services/cldb/ip-10-0-0-251
 /services_config/hoststats/ip-10-0-0-251
 /services/hoststats/master
 /services/webserver/ip-10-0-0-251
 /services/kvstore/master
 /services_config/webserver/ip-10-0-0-251
0x14f1efc29d70000
 /nodes/ip-10-0-0-251/services/nfs/start
 /nodes/ip-10-0-0-251/services/hbmaster/start
 /nodes/ip-10-0-0-251/services/historyserver/stop
 /nodes/ip-10-0-0-251/sdump
 /nodes/ip-10-0-0-251/services/hbmaster/stop
 /nodes/ip-10-0-0-251/services/nfs/stop
 /nodes/ip-10-0-0-251/services/tasktracker/stop
 /nodes/ip-10-0-0-251/services/fileserver/stop
 /nodes/ip-10-0-0-251/services/resourcemanager/stop
 /nodes/ip-10-0-0-251/services/fileserver/start
 /servers/ip-10-0-0-251
 /nodes/ip-10-0-0-251/services/jobtracker/stop
 /nodes/ip-10-0-0-251/services/nodemanager/start
 /nodes/ip-10-0-0-251/services/nodemanager/stop
 /nodes/ip-10-0-0-251/services/hbregionserver/stop
 /nodes/ip-10-0-0-251/services/historyserver/start
 /nodes/ip-10-0-0-251/services/jobtracker/start
 /nodes/ip-10-0-0-251/services/tasktracker/start
 /datacenter/controlnodes/cldb/active/CLDBRunningMaster
 /nodes/ip-10-0-0-251/services/hbregionserver/start
 /nodes/ip-10-0-0-251/stop
 /nodes/ip-10-0-0-251/services/resourcemanager/start
0x14f1efc29d70002
 /datacenter/license/m7/enabled
 /services/kvstore/ip-10-0-0-251
 /nodes/ip-10-0-0-251/services/kvstore
 /services/kvstore/master


Zookeeper : create,list,delete zknodes using java

quick and dirty way but working, to test and doing create,list and delete zknodes using java


Monday, August 10, 2015

Quick and dirty way of Accessing Apache Drill using Drill JDBC connectivity

its a quick and dirty way of accessing the Apache Drill using the Drill-JDBC Driver,make sure you have Drill JDBC DRIVER (drill-jdbc-all-1.1.0.jar ) in the classpath while running your java program.
Java Class:

package com.test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;


public class DrillJDBCTest {

public static void main(String[] args) throws Exception{
Class.forName("org.apache.drill.jdbc.Driver");
Connection connection =DriverManager.getConnection("jdbc:drill:zk=node3.mapr.com:5181/drill/my_cluster_com-drillbits");
Statement st = connection.createStatement();
ResultSet rs = st.executeQuery("SELECT * from cp.`employee`");
while(rs.next()){
System.out.println(rs.getString(1));
}

}

}

After running generated class file you will see the following output
:::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 printing  full name of the employee  
:::::::::::::::::::::::::::::::::::::::::::::::::::::::::
Sheri Nowmer
Derrick Whelply
Michael Spence
Maya Gutierrez
Roberta Damstra
DONE


Writing a parquet file using Hadoop mapreduce job

This is a simple example to write a parquet file using the hadoop mapreduce job.
Env: Java 7,Maven 3.0.1,hadoop1

Step 1: Create a simple java project and add the repository information and dependencies in the pom.xml

<repository>
<id>sonatype-nexus-snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository> <dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-common</artifactId>
<version>1.1.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-encoding</artifactId>
<version>1.1.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-column</artifactId>
<version>1.1.2-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.1.2-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>0.13.0</version>
</dependency>

Step 2: Write a Mapper implementation
package com.test;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.hive.ql.io.parquet.writable.BinaryWritable;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
import parquet.io.api.Binary;
public class ParquetMapper extends Mapper<LongWritable, Text, Void, ArrayWritable>{



@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
DataWritableWriteSupport.getSchema(context.getConfiguration());
}

@Override
public void map(LongWritable n, Text line, Context context) throws IOException, InterruptedException {
if(line!= null && line.getLength() > 0) {
String[] parts = line.toString().split("\t");
Writable[] data = new Writable[2];
for(int i =0; i<2; i++) {
data[i] = new BinaryWritable(Binary.fromString(parts[i]));
}
ArrayWritable aw = new ArrayWritable(Writable.class, data);
context.write(null, aw);
}
}
}

Step 3: finally implement the job driver class as follows
package com.test;
import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static parquet.schema.Type.Repetition.OPTIONAL;
import static parquet.schema.Type.Repetition.REQUIRED;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import parquet.hadoop.ParquetOutputFormat;
import parquet.schema.MessageType;
import parquet.schema.PrimitiveType;

@SuppressWarnings("deprecation")
public class App extends Configured implements Tool {



public int run(String[] args) throws Exception {

Path inpath = new Path(args[0]);
Path outpath = new Path(args[1]);

Configuration conf = getConf();

conf.set(ParquetOutputFormat.BLOCK_SIZE, Integer.toString(128 * 1024 * 1024));
conf.set(ParquetOutputFormat.COMPRESSION, "SNAPPY");

MessageType messagetype = new MessageType("employee",
new PrimitiveType(REQUIRED, BINARY, "empname"),
new PrimitiveType(OPTIONAL, BINARY, "designation"));

DataWritableWriteSupport.setSchema(messagetype,conf);

System.out.println("Schema: " + messagetype.toString());

Job job = new Job(conf, "parquet-convert");

job.setJarByClass(getClass());
job.setJobName(getClass().getName());
job.setMapOutputKeyClass(Void.class);
job.setMapOutputValueClass(ArrayWritable.class);
job.setOutputKeyClass(Void.class);
job.setOutputValueClass(ArrayWritable.class);
job.setMapperClass(ParquetMapper.class);
job.setNumReduceTasks(0);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(ParquetOutputFormat.class);

FileInputFormat.setInputPaths(job, inpath);
ParquetOutputFormat.setOutputPath(job, outpath);

ParquetOutputFormat.setWriteSupportClass(job, DataWritableWriteSupport.class);

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

public static void main( String[] args ) throws Exception
{
int returnCode = ToolRunner.run(new Configuration(), new App(), args);
System.exit(returnCode);
}

}

Now we are good to go, build the project using maven and run your job on hadoop cluster.make sure you provide enough java heap to map task to avoid OOM.

Reading a parquet files using parquet tools


Saturday, August 8, 2015

Apache Drill : Creating Simple UDF

In this example I will demonstrate you how to create a simple Custom UDF in Apache Drill.
Env : Apache Drill 1.1.0
Java : Jdk-1.7_75

objective of the function is to get the row as an input and return the length of the string.in this example we are getting employee full name as input and returning the length of input employee name.

Step 1 : Create a simple java project using maven and add the drill-1.1.0 dependencies in the pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mapr.simplefn.test</groupId>
<artifactId>my-simple-drill-fn</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>my-simple-drill-fn</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.drill.exec</groupId>
<artifactId>drill-java-exec</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>package</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Step 2: Create a java class with the name of EmpNameLength which implements DrillSimpleFunc interface.

package com.mapr.simplefn.test;


import javax.inject.Inject;

import io.netty.buffer.DrillBuf;

import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder;

@FunctionTemplate(name="empnlen",scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
public class EmpNameLength implements DrillSimpleFunc{

@Param
NullableVarCharHolder input;

@Output
VarCharHolder out;

@Inject
DrillBuf buffer;

@Override
public void setup() {
// TODO Auto-generated method stub

}

@Override
public void eval() {
String stringValue = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer);
int outValue = stringValue.length();
String outputValue = String.valueOf(outValue);
out.buffer = buffer;
out.start = 0;
out.end = outputValue.getBytes().length;
buffer.setBytes(0,outputValue.getBytes());

}

}

Step 3: Add a empty drill-override.conf in the resources folder of the project.
Step 4: run mvn package which will build the jar and created in the target folder,copy the jars to the on each node of the drill cluster on the location DRILL_HOME/jars/3rdparty/
Step 5: restart the Drillbits and run the query as follows

Expected Result:
0: jdbc:drill:zk=local> select empnlen(full_name) from cp.`employee.json` limit 5;
+---------+
| EXPR$0  |
+---------+
| 12      |
| 15      |
| 14      |
| 14      |
| 15      |
+---------+

Code Snippet to create a table in MapR-Db


Reading/Writing a file on MapR-FS (MapR filesystem) using a java program

In this short example I will try to demonstrate a java program to Read and Write MapR filesystem.
Step 1: Add the MapR repository and MapR dependencies in the pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.mapr</groupId>
<artifactId>maprfs-example</artifactId>
<version>1.0-SNAPSHOT</version>

<repositories>
<repository>
<id>mapr-releases</id>
<url>http://repository.mapr.com/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3-mapr-4.1.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3-mapr-4.1.0</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>target</outputDirectory>
<destFileName>hadoop-core-1.0.3-mapr-4.1.0.jar</destFileName>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<finalName>${project.name}-${project.version}</finalName>
<appendAssemblyId>true</appendAssemblyId>
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
</plugin>
</plugins>
</build>

</project>

Step 2: Java program to read and Write to the filesystem.
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
public class ReadWriteMapRFS
{
public static void main(String args[]) throws Exception {
byte buf[] = new byte[ 65*1024];
int ac = 0;
if (args.length != 1) {
System.out.println("usage: ReadWriteMapRFS pathname");
return;
}

String dirname = args[ac++];

Configuration conf = new Configuration();
conf.set("fs.default.name", "maprfs://mapr.cluster.com:7222");
FileSystem fs = FileSystem.get(conf);

Path testDir = new Path(dirname+"/testdir");
boolean rc = fs.mkdirs( testDir);
if (!rc) {
System.out.println("unable to create directory with path " + testDir);
return;
}
System.out.println("created a new directory on path "+testDir);
Path testfile = new Path( testDir + "/testfile.txt");
System.out.println("now going to create a file inside the directory");

FSDataOutputStream fos = fs.create( testfile,true,512,(short) 1, (long)(64*1024*1024));
fos.write(buf);
fos.close();

System.out.println( "reading a recently created file : " + testfile);
FSDataInputStream istr = fs.open( testfile);
int bb = istr.readInt();
istr.close();
System.out.println( "complete read : DONE");
}
}

Friday, August 7, 2015

Maven Plugin to create jar with source

maven plugin to generate source jar along with the class jars -- run with mvn package

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-source-plugin</artifactId>
    <version>2.4</version>
    <executions>
        <execution>
            <id>attach-sources</id>
            <phase>package</phase>
            <goals>
                <goal>jar-no-fork</goal>
            </goals>
        </execution>
    </executions>
</plugin>

Saturday, January 24, 2015

Apache Hive : Indexes

Indexes are special lookup tables that the database search engine can use to speed up data retrieval.an index is a pointer to data in a table.the Indexes are implemented by using various data structure e.g. BTree, BitMap etc.Hive has limited indexing capabilities. you can speed up some of the data retrival operation using the indexes, the index data is stored in the another table in the Hive

The other way of speed up your query other than the Indexing are partitioning and bucketing which we will cover in coming posts.
Indexing is also a good alternative to partitioning when the logical partitions would.Indexes in Hive, like those in relational databases, need to be evaluated carefully.

Maintaining an index requires extra disk space and building an index has a processing cost.

Creating an Index
hive> create index customer_index on table customers(custid) as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
> with deferred rebuild
> in table customer_index_table
> COMMENT 'Customer Index on custid';
OK
Time taken: 5.713 seconds

Creating an bitmap Index
hive> create index customer_index_bitmap on table customers(custid)
> as 'BITMAP'
> with deferred rebuild
> COMMENT 'Index on customer table on custid using bitmap';
OK
Time taken: 0.603 seconds

View an Index
hive> show formatted index on customers;                       
OK
idx_name tab_name col_names idx_tab_name idx_type comment


customer_index customers custid customer_index_table compact Customer Index on custid
customer_index_bitmap customers custid orderdb__customers_customer_index_bitmap__ bitmap Index on customer table on custid using bitmap
Time taken: 0.988 seconds, Fetched: 5 row(s)

Rebuild an Index
hive> alter index customer_index on customers rebuild;      
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
Starting Job = job_201501241609_0022, Tracking URL = http://RKS-PC:50030/jobdetails.jsp?jobid=job_201501241609_0022
Kill Command = /usr/local/hadoop/libexec/../bin/hadoop job -kill job_201501241609_0022
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2015-01-24 18:32:39,926 Stage-1 map = 0%, reduce = 0%
2015-01-24 18:32:41,975 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.81 sec
2015-01-24 18:32:42,978 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.81 sec
2015-01-24 18:32:43,982 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.81 sec
2015-01-24 18:32:45,011 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.81 sec
2015-01-24 18:32:46,071 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.81 sec
2015-01-24 18:32:47,078 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.81 sec
2015-01-24 18:32:48,093 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.81 sec
2015-01-24 18:32:49,159 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.81 sec
2015-01-24 18:32:50,245 Stage-1 map = 100%, reduce = 33%, Cumulative CPU 0.81 sec
2015-01-24 18:32:51,305 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.24 sec
2015-01-24 18:32:52,309 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.24 sec
2015-01-24 18:32:53,385 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.24 sec
MapReduce Total cumulative CPU time: 2 seconds 240 msec
Ended Job = job_201501241609_0022
Loading data to table orderdb.customer_index_table
Deleted hdfs://RKS-PC:54310/user/hive/warehouse/orderdb.db/customer_index_table
Table orderdb.customer_index_table stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 8567, raw_data_size: 0]
MapReduce Jobs Launched:
Job 0: Map: 1 Reduce: 1 Cumulative CPU: 2.24 sec HDFS Read: 8569 HDFS Write: 8567 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 240 msec
OK
Time taken: 42.379 seconds

Drop an Index
hive> drop index if exists customer_index on customers;
OK
Time taken: 3.267 seconds




Apache Hive : HiveQL View

view is a virtual table based on the result-set of an SQL statement.A view allows a query to be saved and treated like a table.

A view contains rows and columns, just like a real table. The fields in a view are fields from one or more real tables in the database.
You can add SQL functions, WHERE, and JOIN statements to a view and present the data as if the data were coming from one single table.Currently Hive does not support materialized views.

lets create a view on the join condition of customers and oreders tables

hive> create view customer_order as select c.custid,c.custname,o.orderid,o.tracking_id from customers c inner join orders o on c.custid=o.fk_cust_id;
OK
Time taken: 1.352 seconds
hive> select * from customer_order;
Total MapReduce jobs = 1
setting HADOOP_USER_NAME rks
Execution log at: /tmp/rks/.log
2015-01-24 05:40:55 Starting to launch local task to process map join; maximum memory = 932118528
2015-01-24 05:40:56 Processing rows: 101 Hashtable size: 101 Memory usage: 7985904 rate: 0.009
2015-01-24 05:40:56 Dump the hashtable into file: file:/tmp/rks/hive_2015-01-24_17-40-47_808_6049446203223532344/-local-10002/HashTable-Stage-3/MapJoin-mapfile31--.hashtable
2015-01-24 05:40:56 Upload 1 File to: file:/tmp/rks/hive_2015-01-24_17-40-47_808_6049446203223532344/-local-10002/HashTable-Stage-3/MapJoin-mapfile31--.hashtable File size: 6249
2015-01-24 05:40:56 End of local task; Time Taken: 0.895 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201501241609_0021, Tracking URL = http://RKS-PC:50030/jobdetails.jsp?jobid=job_201501241609_0021
Kill Command = /usr/local/hadoop/libexec/../bin/hadoop job -kill job_201501241609_0021
Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0
2015-01-24 17:41:02,863 Stage-3 map = 0%, reduce = 0%
2015-01-24 17:41:05,872 Stage-3 map = 100%, reduce = 0%, Cumulative CPU 0.94 sec
2015-01-24 17:41:06,875 Stage-3 map = 100%, reduce = 0%, Cumulative CPU 0.94 sec
2015-01-24 17:41:07,883 Stage-3 map = 100%, reduce = 100%, Cumulative CPU 0.94 sec
MapReduce Total cumulative CPU time: 940 msec
Ended Job = job_201501241609_0021
MapReduce Jobs Launched:
Job 0: Map: 1 Cumulative CPU: 0.94 sec HDFS Read: 8569 HDFS Write: 425 SUCCESS
Total MapReduce CPU Time Spent: 940 msec
OK
1009 Declan Hooper 745651 ULQ37MGX7MW
1018 Nathan Mcintyre 745652 NJJ84QEM7GO
1027 Troy Griffith 745653 UHX76SFB1EP
1036 Clark Frazier 745654 ZZH74UDJ6IC
1045 Tad Cross 745655 TFV46VBX1ZI
1054 Gannon Bradshaw 745656 EHJ68BHA6UU
1063 Walter Figueroa 745657 BNU38NFJ6FO
1072 Brady Mcclure 745658 NBK17XMP9XC
1081 Porter Bowers 745659 XHB61DLY6IK
1090 Jakeem Knight 745660 WNN67FXM2NC
1099 Macaulay Armstrong 745661 VXI39DIZ3HU
Time taken: 20.208 seconds, Fetched: 11 row(s)
hive>


Apache Hive : Joining datasets

Hive Supports the SQL Join but only equi-joins are supported, A SQL join is used to combine two or more than two table based on some criteria.the most used join is the Inner Join which return the all the rows between two datasets where join condition met.
lets see in the example how Inner join works in Hive. to demonstrate it we have two data sets the first one is the customers dataset which hold the information of customers.
customers.csv
cust_id,cust_name,ship_address,phone,email
1001,Sawyer Thompson,"-16.44456  115.92975",1-808-310-6814,faucibus@lacinia.net
1002,Xenos Campos,"5.69702  -164.57551",1-872-151-8210,dolor.Fusce@Nunc.com
1003,Brandon Castro,"-25.12774  -151.86179",1-283-827-7635,parturient@aliquameu.org
1004,Evan Gordon,"-20.12089  -85.3661",1-801-885-3833,Fusce.fermentum@Integereu.ca
1005,Macon Hopper,"22.30371  134.10815",1-354-448-6576,est.congue@acturpisegestas.net
1006,Christian Tucker,"73.86819  114.71156",1-106-627-3799,at.egestas.a@Fuscealiquam.net
the other dataset is orders dataset which hold the information about the order placed by the customers.
orders.csv
orderid,cust_id,item,order_dt,track_id
745651,1009,Cephalexin,08/09/2013,ULQ37MGX7MW
745652,1018,Hydrochlorothiazide,01/01/2015,NJJ84QEM7GO
745653,1027,Sertraline HCl,07/13/2014,UHX76SFB1EP
745654,1036,Simvastatin,01/05/2014,ZZH74UDJ6IC
745655,1045,Lisinopril,04/22/2014,TFV46VBX1ZI
745656,1054,Ibuprofen (Rx),08/22/2015,EHJ68BHA6UU
745657,1063,Suboxone,12/10/2014,BNU38NFJ6FO

we have already created the two tables in the hive named as customers and orders which hold the data for the customers and the orders.
customers table
hive> describe customers;
OK
custid              	int                 	customer id         
custname            	string              	customer name       
address             	string              	customer Address    
phone               	string              	customer phone      
email               	string              	customer email      
Time taken: 0.524 seconds, Fetched: 5 row(s)
orders table
hive> describe orders; 
OK
orderid             	int                 	Order ID            
fk_cust_id          	int                 	Cust ID reffering to customers
item                	string              	Order Item          
order_dt            	string              	Order Date          
tracking_id         	string              	Tracking ID for Order
Time taken: 0.732 seconds, Fetched: 5 row(s)

INNER JOIN
The INNER JOIN keyword selects all rows from both tables as long as there is a match between the columns in both tables.
hive> select c.custid,c.custname,o.orderid,o.tracking_id from customers c inner join orders o on c.custid=o.fk_cust_id
> ;
Total MapReduce jobs = 1
setting HADOOP_USER_NAME	rks
Execution log at: /tmp/rks/.log
2015-01-24 05:03:25	Starting to launch local task to process map join;	maximum memory = 932118528
2015-01-24 05:03:25	Processing rows:	101	Hashtable size:	101	Memory usage:	8029040	rate:	0.009
2015-01-24 05:03:25	Dump the hashtable into file: file:/tmp/rks/hive_2015-01-24_17-03-19_651_4336759746543942005/-local-10002/HashTable-Stage-3/MapJoin-mapfile01--.hashtable
2015-01-24 05:03:25	Upload 1 File to: file:/tmp/rks/hive_2015-01-24_17-03-19_651_4336759746543942005/-local-10002/HashTable-Stage-3/MapJoin-mapfile01--.hashtable File size: 6249
2015-01-24 05:03:25	End of local task; Time Taken: 0.751 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201501241609_0017, Tracking URL = http://RKS-PC:50030/jobdetails.jsp?jobid=job_201501241609_0017
Kill Command = /usr/local/hadoop/libexec/../bin/hadoop job  -kill job_201501241609_0017
Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0
2015-01-24 17:03:31,889 Stage-3 map = 0%,  reduce = 0%
2015-01-24 17:03:34,902 Stage-3 map = 100%,  reduce = 0%, Cumulative CPU 0.92 sec
2015-01-24 17:03:35,906 Stage-3 map = 100%,  reduce = 0%, Cumulative CPU 0.92 sec
2015-01-24 17:03:36,916 Stage-3 map = 100%,  reduce = 100%, Cumulative CPU 0.92 sec
MapReduce Total cumulative CPU time: 920 msec
Ended Job = job_201501241609_0017
MapReduce Jobs Launched: 
Job 0: Map: 1   Cumulative CPU: 0.92 sec   HDFS Read: 8569 HDFS Write: 425 SUCCESS
Total MapReduce CPU Time Spent: 920 msec
OK
1009	Declan Hooper	745651	ULQ37MGX7MW
1018	Nathan Mcintyre	745652	NJJ84QEM7GO
1027	Troy Griffith	745653	UHX76SFB1EP
1036	Clark Frazier	745654	ZZH74UDJ6IC
1045	Tad Cross	745655	TFV46VBX1ZI
1054	Gannon Bradshaw	745656	EHJ68BHA6UU
1063	Walter Figueroa	745657	BNU38NFJ6FO
1072	Brady Mcclure	745658	NBK17XMP9XC
1081	Porter Bowers	745659	XHB61DLY6IK
1090	Jakeem Knight	745660	WNN67FXM2NC
1099	Macaulay Armstrong	745661	VXI39DIZ3HU
Time taken: 17.391 seconds, Fetched: 11 row(s)
hive>

LEFT OUTER JOIN
The LEFT JOIN keyword returns all rows from the left table with the matching rows in the right table. The result is NULL in the right side when there is no match.
hive> select c.custid,c.custname,o.orderid,o.tracking_id from customers c left outer join orders o on c.custid=o.fk_cust_id;
Total MapReduce jobs = 1
setting HADOOP_USER_NAME	rks
Execution log at: /tmp/rks/.log
2015-01-24 05:08:40	Starting to launch local task to process map join;	maximum memory = 932118528
2015-01-24 05:08:41	Processing rows:	101	Hashtable size:	101	Memory usage:	8133752	rate:	0.009
2015-01-24 05:08:41	Dump the hashtable into file: file:/tmp/rks/hive_2015-01-24_17-08-34_361_1900203016678725125/-local-10002/HashTable-Stage-3/MapJoin-mapfile11--.hashtable
2015-01-24 05:08:41	Upload 1 File to: file:/tmp/rks/hive_2015-01-24_17-08-34_361_1900203016678725125/-local-10002/HashTable-Stage-3/MapJoin-mapfile11--.hashtable File size: 6249
2015-01-24 05:08:41	End of local task; Time Taken: 0.908 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201501241609_0018, Tracking URL = http://RKS-PC:50030/jobdetails.jsp?jobid=job_201501241609_0018
Kill Command = /usr/local/hadoop/libexec/../bin/hadoop job  -kill job_201501241609_0018
Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0
2015-01-24 17:08:48,387 Stage-3 map = 0%,  reduce = 0%
2015-01-24 17:08:51,396 Stage-3 map = 100%,  reduce = 0%, Cumulative CPU 0.88 sec
2015-01-24 17:08:52,400 Stage-3 map = 100%,  reduce = 0%, Cumulative CPU 0.88 sec
2015-01-24 17:08:53,408 Stage-3 map = 100%,  reduce = 100%, Cumulative CPU 0.88 sec
MapReduce Total cumulative CPU time: 880 msec
Ended Job = job_201501241609_0018
MapReduce Jobs Launched: 
Job 0: Map: 1   Cumulative CPU: 0.88 sec   HDFS Read: 8569 HDFS Write: 2629 SUCCESS
Total MapReduce CPU Time Spent: 880 msec
OK
NULL	cust_name	NULL	NULL
1001	Sawyer Thompson	NULL	NULL
1002	Xenos Campos	NULL	NULL
1003	Brandon Castro	NULL	NULL
1004	Evan Gordon	NULL	NULL
1005	Macon Hopper	NULL	NULL
1006	Christian Tucker	NULL	NULL
1007	Rafael Erickson	NULL	NULL
1008	Brent Roth	NULL	NULL
1009	Declan Hooper	745651	ULQ37MGX7MW
1010	Neil Leon	NULL	NULL

RIGHT OUTER JOIN
The RIGHT JOIN keyword returns all rows from the right table , with the matching rows in the left table. The result is NULL in the left side when there is no match.
hive> select c.custid,c.custname,o.orderid,o.tracking_id from customers c right outer join orders o on c.custid=o.fk_cust_id;
Total MapReduce jobs = 1
setting HADOOP_USER_NAME	rks
Execution log at: /tmp/rks/.log
2015-01-24 05:10:50	Starting to launch local task to process map join;	maximum memory = 932118528
2015-01-24 05:10:51	Processing rows:	101	Hashtable size:	101	Memory usage:	7971568	rate:	0.009
2015-01-24 05:10:51	Dump the hashtable into file: file:/tmp/rks/hive_2015-01-24_17-10-44_697_521683568687053567/-local-10002/HashTable-Stage-3/MapJoin-mapfile20--.hashtable
2015-01-24 05:10:51	Upload 1 File to: file:/tmp/rks/hive_2015-01-24_17-10-44_697_521683568687053567/-local-10002/HashTable-Stage-3/MapJoin-mapfile20--.hashtable File size: 6317
2015-01-24 05:10:51	End of local task; Time Taken: 0.712 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201501241609_0019, Tracking URL = http://RKS-PC:50030/jobdetails.jsp?jobid=job_201501241609_0019
Kill Command = /usr/local/hadoop/libexec/../bin/hadoop job  -kill job_201501241609_0019
Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0
2015-01-24 17:10:58,019 Stage-3 map = 0%,  reduce = 0%
2015-01-24 17:11:01,064 Stage-3 map = 100%,  reduce = 0%, Cumulative CPU 0.91 sec
2015-01-24 17:11:02,067 Stage-3 map = 100%,  reduce = 0%, Cumulative CPU 0.91 sec
2015-01-24 17:11:03,073 Stage-3 map = 100%,  reduce = 100%, Cumulative CPU 0.91 sec
MapReduce Total cumulative CPU time: 910 msec
Ended Job = job_201501241609_0019
MapReduce Jobs Launched: 
Job 0: Map: 1   Cumulative CPU: 0.91 sec   HDFS Read: 5205 HDFS Write: 2668 SUCCESS
Total MapReduce CPU Time Spent: 910 msec
OK
NULL	NULL	NULL	track_id
1009	Declan Hooper	745651	ULQ37MGX7MW
1018	Nathan Mcintyre	745652	NJJ84QEM7GO
1027	Troy Griffith	745653	UHX76SFB1EP
1036	Clark Frazier	745654	ZZH74UDJ6IC
1045	Tad Cross	745655	TFV46VBX1ZI
1054	Gannon Bradshaw	745656	EHJ68BHA6UU
1063	Walter Figueroa	745657	BNU38NFJ6FO
1072	Brady Mcclure	745658	NBK17XMP9XC
1081	Porter Bowers	745659	XHB61DLY6IK
1090	Jakeem Knight	745660	WNN67FXM2NC
1099	Macaulay Armstrong	745661	VXI39DIZ3HU
NULL	NULL	745662	DKP00ZCS6FU
NULL	NULL	745663	YSJ42ZXP5ZG
NULL	NULL	745664	OBT90SWM3FN
NULL	NULL	745665	YVJ22BYO5DT
NULL	NULL	745666	DXY85QAL1BE
NULL	NULL	745667	THJ12NCF3KR

FULL OUTER JOIN
The FULL OUTER JOIN returns all rows from the left table and from the right table.The FULL OUTER JOIN combines the result of both LEFT and RIGHT joins.
hive> 
> select c.custid,c.custname,o.orderid,o.tracking_id from customers c full outer join orders o on c.custid=o.fk_cust_id; 
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
Starting Job = job_201501241609_0020, Tracking URL = http://RKS-PC:50030/jobdetails.jsp?jobid=job_201501241609_0020
Kill Command = /usr/local/hadoop/libexec/../bin/hadoop job  -kill job_201501241609_0020
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2015-01-24 17:12:46,443 Stage-1 map = 0%,  reduce = 0%
2015-01-24 17:12:50,465 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 1.08 sec
2015-01-24 17:12:51,470 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 1.08 sec
2015-01-24 17:12:52,478 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.2 sec
2015-01-24 17:12:53,488 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.2 sec
2015-01-24 17:12:54,498 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.2 sec
2015-01-24 17:12:55,504 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.2 sec
2015-01-24 17:12:56,512 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.2 sec
2015-01-24 17:12:57,521 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.2 sec
2015-01-24 17:12:58,531 Stage-1 map = 100%,  reduce = 33%, Cumulative CPU 2.89 sec
2015-01-24 17:12:59,538 Stage-1 map = 100%,  reduce = 33%, Cumulative CPU 2.89 sec
2015-01-24 17:13:00,545 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 3.85 sec
2015-01-24 17:13:01,551 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 3.85 sec
2015-01-24 17:13:02,560 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 3.85 sec
MapReduce Total cumulative CPU time: 3 seconds 850 msec
Ended Job = job_201501241609_0020
MapReduce Jobs Launched: 
Job 0: Map: 2  Reduce: 1   Cumulative CPU: 3.85 sec   HDFS Read: 13774 HDFS Write: 4872 SUCCESS
Total MapReduce CPU Time Spent: 3 seconds 850 msec
OK
NULL	cust_name	NULL	NULL
NULL	NULL	NULL	track_id
1001	Sawyer Thompson	NULL	NULL
1002	Xenos Campos	NULL	NULL
1003	Brandon Castro	NULL	NULL
1004	Evan Gordon	NULL	NULL
1005	Macon Hopper	NULL	NULL
1006	Christian Tucker	NULL	NULL
1007	Rafael Erickson	NULL	NULL
1008	Brent Roth	NULL	NULL
1009	Declan Hooper	745651	ULQ37MGX7MW
1010	Neil Leon	NULL	NULL
1011	Lionel Vaughan	NULL	NULL
1012	Dillon Johns	NULL	NULL
1013	Davis Fisher	NULL	NULL
1014	Isaac Fields	NULL	NULL
1015	Micah Figueroa	NULL	NULL
1016	Burke Merrill	NULL	NULL
1017	Felix Ward	NULL	NULL
1018	Nathan Mcintyre	745652	NJJ84QEM7GO
1019	Perry Bullock	NULL	NULL
1020	Ali Kramer	NULL	NULL
1021	Timothy Avila	NULL	NULL
1022	Jason Wolfe	NULL	NULL

Monday, January 12, 2015

Apache Hive : HiveQL loading a data into the table and Query it

Hive has no row-level insert, update, and delete operations, the only way to put
data into an table is to use one of the “bulk” load operations. Or you can just write files
in the correct directories by other means.in this example we will see how you can load data into the hive table.

Create a managed table first defining ROW FORMAT and FIELD TERMINATED BY ',' to pre-process the data before loading into the table. then load the data using LOAD DATA LOCAL INPATH.
run the query with some where predicate will launch a mapreduce job and give you the results.

Sunday, January 11, 2015

Apache Hive : Getting started with HiveQL

HiveQL is the Hive query language.it does not conform to ANSI SQL like any other databases query languages. Hive do not support row level insert,update and delete and also do not support the transactions.HiveQL supports creation, alteration of databases and do support the drop DDL too. create,alter and drop DDL can be applied to the other HIVE database objects e.g. Table, views, Indexes and function.
In this post we will try to run some of the DDL statement on the HIVE Database.

Create Database: (Click Image to Enlarge)



Managed Tables Vs External Tables

Hive controls the life cycle of Managed table's data,hive store the data of the managed table under the directory /user/hive/warehouse by default.as soon as we drop the manged table,hive deletes the data inside of the table. theoretically hive has ownership of the data in case of manged table.
Hive provides you the flexibility to the user to define an External table that points to the data but do not take the ownership of the data. its a handy way to share data among various tools to do analytic over it.the External table can be defined using the 'EXTERNAL' keyword and LOCATION keyword to locate the table.

Create Table : Managed Table (Click Image to Enlarge)




Create Table : External Table (Click Image to Enlarge)



Create Table : Partitioned Table (Click Image to Enlarge)
To tune up database query Hive uses the concept of Partitioning in which database is partitioned among multiple part horizontally. Partitions are essentially horizontal slices of data which allow larger sets of data to be separated into more manageable chunks so that user select predicate can only look into the target partition only.

Apache Hive : Configuring Hive MetaStore using JDBC

Hive requires metastore that stores metadata (e.g. table schema,partition,SerDe) information so that user can run DDL and DML commands on the hive.
hive also have embedded metastore in the form of derby database but it is not suitable for the concurrent access or heavy usages.
Hive gives you flexibility to configure any of the JDBC complaint database to use as MetaStore such as MySQL,PostGress etc.As MySQL database is popular among the most used Hive MetaStore so in this post I will demonstrate you to configure MySQL as Hive MetaStore.

To configure MySQL as a Hive metastore, first install and start MySQL server, here is my first step to install MySQL server on Ubuntu

sudo apt-get install mysql-server

download the MySQL connector jar and place it into the $HIVE_HOME/lib

now edit conf/hive-site.xml as follows

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <property>
 <name>javax.jdo.option.ConnectionURL</name>
 <value>jdbc:mysql://hostname/hivedb?createDatabaseIfNotExist=true</value>
 </property>
<property>
 <name>javax.jdo.option.ConnectionDriverName</name>
 <value>com.mysql.jdbc.Driver</value>
 </property>
 <property>
 <name>javax.jdo.option.ConnectionUserName</name>
 <value>username</value>
 </property>
 <property>
 <name>javax.jdo.option.ConnectionPassword</name>
 <value>password</value>
 </property>
</configuration>

if your metastore is running on remote machine do add the following property to conf/hive-site.xml

<property>
        <name>hive.metastore.uris</name>
        <value>thrift://remotehostname:9083</value>
    </property>

now you are good to go to use MySQL MetaStore.to start an external Hive metastore service use the command
hive --service metastore &


Saturday, January 10, 2015

Apache Hive : Installation and configuration of Hive


To install Hive you can follow my old post to build hive from source if you want to do some customization with the source code. if you are newbie to hive then i will recommend you to download some stable version of hive from apache hive website and extract the tarball to your preferred location.

I have downloaded the hive-.11 tarball from the apache hive website and extracted the tarball in the folder /usr/local/hive/hive-0.11.0, thats what you need to install hive on your machine make sure that Hadoop is already installed your machine.Hive uses the environment variable HADOOP_HOME to locate Hadoop jars and its configuration files.

$cd /usr/local
$mkdir hive
$cd hive
$curl -o http://archive.apache.org/dist/hive/hive-0.11.0/hive-0.11.0-bin.tar.gz
$tar -xzf hive-0.11.0.tar.gz

now set environment variable HADOOP_HOME and HIVE_HOME. open .bashrc in some editor e.g. vi and add these lines to the file
export HADOOP_HOME=/usr/local/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
export HIVE_HOME=/usr/local/hive/hive-0.11.0
export PATH=$PATH:$HIVE_HOME/bin
The $HIVE_HOME/bin directory contains executable scripts that launch various Hive services, including the hive command-line interface.
Hive binaries also contains the Trift services which provides access to other agents or the processes.on the top of the Thrift hive provides access using JDBC and ODBC.
one most of the important thing in hive installation is the metastore, metastore is used by hive to store table schema and metadata information.Hive has in built-in Derby database to store metadata.you are also free to configure and RDBMS of your choice to provide metastore service,typical hive installation includes MySQL and Postgress databases.

now you can check your installation by starting hive CLI.
$cd $HIVE_HOME
$hive
Logging initialized using configuration in jar:file:/usr/local/hive/hive-0.11.0/lib/hive-common-0.11.0.jar!/hive-log4j.properties
Hive history file=/tmp/rks/hive_job_log_rks_2867@RKS-PC_201501101811_648770952.txt
hive> 






Apache Hive : Building Hive from source

It will be good if you download the Apache hive release from the Apache website and install on your development environment but some times it good to experiment with the apache hive source and then install it on your development environment.

In this post I will help you to build hive from source, the minimum requirement for building hive from source are you need to have SVN installation on your machine, Apart from this you should have JDK 6 or 7 and Apache maven installation.

just follow the instruction below to build the hive from source.

# checkout some stable version of hive from svn branch
$svn co http://svn.apache.org/repos/asf/hive/branches/branch-0.11 hive-11

# change location to directory hive-11
$cd hive-11

# now build the source with the help of maven command, I have skipped the testcases to run and enable the debug and error mode to check the problem in build if it occurs
$mvn clean install -Phadoop-2,dist -Dmaven.test.skip=true -e -X

# thats all what you need to build hive through source code, look into the package directory to find the build
$cd packaging/target/apache-hive-0.13.1-bin/apache-hive-0.13.1-bin
$ls
bin  conf  examples  hcatalog  lib  LICENSE  NOTICE  README.txt  RELEASE_NOTES.txt  scripts


Sunday, January 4, 2015

Bloom filter using Google Guava

A Bloom filter is a data structure designed to tell you, rapidly and memory-efficiently, whether an element is present in a set.it does not actually store the element in the set to confirm availablity of the element.it is a probabilistic data structure which tells us that the element either definitely is not in the set or may be in the set.

The base data structure of a Bloom filter is a Bit Vector.
How BloomFilter works
Adding an element in the BloomFilter
Add the element to the filter.
Hash it a several times on some hashing techniques and set the bits to 1 where the index matches the results of the hash in the Bit Vector.

Test whether and element in the set or not
if an element is in the set, you follow the same hashing techniques and check if the bits are set to 1 or 0.
BloomFilter can guarantee an element does not exist. If the bits are not set, it’s simply impossible for the element to be in the set. However, a positive answer means the element is in the set or a hashing collision occurred.
/**
* 
*/
package com.rajkrrsingh.test.guava;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnel;
import com.google.common.hash.Sink;

/**
* @author rks
* @03-Jan-2015
*/
public class GuavaBloomFilterTest {

private static BloomFilter<Employee> bloomFilter = BloomFilter.create(new Funnel<Employee>() {

@Override
public void funnel(Employee emp, Sink into) {
into.putString(emp.getEmpid())
.putString(emp.getEmpName())
.putInt(emp.getAge());
}
}, 100);

public static void main(String[] args) {

for(Employee e : Employee.getEmployeeList()){
GuavaBloomFilterTest.bloomFilter.put(e);
}

boolean mightContain = GuavaBloomFilterTest.bloomFilter.mightContain(new Employee("101", "RKS", 10000, 31));
System.out.println(mightContain);

// negative test
boolean mightContain1 = GuavaBloomFilterTest.bloomFilter.mightContain(new Employee("102", "RKS", 10000, 31));
System.out.println(mightContain1);

}

}

Google Guava Cache Implementation

Caches are very useful in a range of use cases. for frequent data you can have it in cache instead of doing some computing intensive operation that will help you to achieve a good performance.

you can build a cache using a simple hashmap implementation where you put your data in the key/value pair form. putIfAbsent() method of hashmap is very handy to update the hashmap if lookup for the key fails.

in the multithreaded environment you can suffer a dirty read in case you are using a simple hashmap implementation. for the multithreaded environment you need of focus on the ConcurrentHashMap which will provide you concurrency.

ConcurrentMap based caching is good but you need to adopt a use defined eviction policy to remove element which are not used frequently.
Google guava api provide you more flexible caching based on the ConcurrentMap with more cleaner implementation with a limitation that it not store data in files or on some server e.g. Memcached.Apart from the eviction policy it also support the putIfAbsent scenerio well.

in the coming example lets see how simple is to create and use Guava based cache.
package com.rajkrrsingh.test.guava;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

public class GuavaCache {
 
  private static final long MAX_SIZE = 100;
 
  private final LoadingCache<String, String> cache;
 
  public GuavaCache() {
    cache = CacheBuilder.newBuilder().maximumSize( MAX_SIZE ).build( new CacheLoader<String, String>() {
        @Override
        public String load( String key ) throws Exception {
          return key.toUpperCase();
        }
      }
    );
  }
 
  public String getEntry( String key ) {
    return cache.getUnchecked( key );
  }
 
  
  public static void main(String[] args) {
 GuavaCache gchache = new GuavaCache();
 System.out.println(gchache.getEntry("hello"));
 System.out.println(gchache.cache.size());
 for (int i = 0; i < 150; i++) {
  System.out.println(gchache.getEntry("hello"+i));
  System.out.println(gchache.cache.size());
 }
 // checking the eviction policy
 System.out.println(gchache.cache.getIfPresent("hello"));
}
}

Google Gauva API in a one glance: Throwables

public final class Throwables
extends Object
Static utility methods pertaining to instances of Throwable.
static List<Throwable>getCausalChain(Throwable throwable)
Gets a Throwable cause chain as a list.
static ThrowablegetRootCause(Throwable throwable)
Returns the innermost cause of throwable.
static StringgetStackTraceAsString(Throwable throwable)
Returns a string containing the result of toString(), followed by the full, recursive stack trace of throwable.
static RuntimeExceptionpropagate(Throwable throwable)
Propagates throwable as-is if it is an instance of RuntimeException or Error, or else as a last resort, wraps it in a RuntimeException then propagates.
static <X extends Throwable>
void
propagateIfInstanceOf(Throwable throwable, Class<X> declaredType)
Propagates throwable exactly as-is, if and only if it is an instance of declaredType.
static voidpropagateIfPossible(Throwable throwable)
Propagates throwable exactly as-is, if and only if it is an instance of RuntimeException or Error.
static <X extends Throwable>
void
propagateIfPossible(Throwable throwable, Class<X> declaredType)
Propagates throwable exactly as-is, if and only if it is an instance of RuntimeExceptionError, or declaredType.
static <X1 extends Throwable,X2 extends Throwable>
void
propagateIfPossible(Throwable throwable, Class<X1> declaredType1, Class<X2> declaredType2)
Propagates throwable exactly as-is, if and only if it is an instance of RuntimeExceptionErrordeclaredType1, or declaredType2.
let's see how throwables work with the help of sample code
/**
*
*/
package com.rajkrrsingh.test.guava;

import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;

import com.google.common.base.Throwables;

/**
* @author rks
* @04-Jan-2015
*/
public class GuavaBiMapAndMultiMapDemo {

public static void main(String[] args) throws Exception {
throwablesDemo();
}
// working with throwables
public static void throwablesDemo() throws Exception {

try {
try {
try {
throw new RuntimeException("Root exception");
}
catch(Exception e) {
throw new SQLException("Middle tier exception", e);
}
}
catch(Exception e) {
throw new IllegalStateException("outer exception", e);
}
}
catch(Exception e) {
// getting the root exception
System.out.println(Throwables.getRootCause(e).getMessage());
// list of exceptions
List<Throwable> list = Throwables.getCausalChain(e);
Iterator<Throwable> itr= list.iterator();
while(itr.hasNext()){
System.out.println(itr.next().getMessage());
}
// get stacktrace as string
System.out.println(Throwables.getStackTraceAsString(e));
}
}

}