Friday, October 23, 2015

Spark Streaming Sample program using scala

mkdir spark-streaming-example
cd spark-streaming-example/
mkdir -p src/main/scala
cd src/main/scala
vim TestStreaming.scala

add following line of code to TestStreaming.scala

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf

object TestStreaming{
        def main(args:Array[String]){
                val conf = new SparkConf().setAppName("Streaming Test")
                val ssc = new StreamingContext(conf,Seconds(30))
                val lines = ssc.socketTextStream("hostname",6002)
                val errorLines = lines.filter(_.contains("error"))
                println("starting streaming context")
                println("awaiting termination")
                println("App done")

cd -
vim build.sbt
name := "Spark Streaming Example"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.4.1","org.apache.spark" %% "spark-streaming" % "1.4.1")

*now run sbt package from project home and it will build a jar inside target/scala-2.10/spark-streaming-example_2.10-1.0.jar

* run this jar using spark-submit
#bin/spark-submit --class TestStreaming target/scala-2.10/spark-streaming-example_2.10-1.0.jar

to test this program open a different terminal and run nc -lk `hostname` 6002 hit enter and type anything on console while will display on the spark console.

Saprk-Sql : How to query a csv file.

$/spark-1.4.1/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.1.0
scala> sqlContext
res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@12c57a43

scala> val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "file:///root/emp.csv","header"->"true"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [emp_id: string, emp_name: string, country: string, salary: string]

scala>   df.printSchema()
 |-- emp_id: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- salary: string (nullable = true)

scala> df.registerTempTable("emp")

scala> val names = sqlContext.sql("select emp_name from emp")
names: org.apache.spark.sql.DataFrame = [emp_name: string]

scala> names.foreach(println)

Thursday, October 22, 2015

Spark-SQL : how to query json files

scala> sqlContext
res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@330305d3

scala> val df = sqlContext.load("org.apache.spark.sql.json", Map("path" -> "file:///employee.json"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [birth_date: string, department_id: bigint, education_level: string, employee_id: bigint, end_date: string, first_name: string, full_name: string, gender: string, hire_date: string, last_name: string, management_role: string, marital_status: string, position_id: bigint, position_title: string, salary: double, store_id: bigint, supervisor_id: bigint]

scala>  df.printSchema()
 |-- birth_date: string (nullable = true)
 |-- department_id: long (nullable = true)
 |-- education_level: string (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- end_date: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- management_role: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- position_id: long (nullable = true)
 |-- position_title: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- store_id: long (nullable = true)
 |-- supervisor_id: long (nullable = true)

scala> df.registerTempTable("employee")
scala> val names = sqlContext.sql("select first_name from employee limit 5")
names: org.apache.spark.sql.DataFrame = [first_name: string]
scala> names.foreach(println)

Monday, October 19, 2015

Apache Drill -JDBC Storage Plugin Configuration

To query mysql datasource from apache drill, create a new plugin after accessing the Drill-UI with the following configuration
  "type": "jdbc",
  "driver": "com.mysql.jdbc.Driver",
  "url": "jdbc:mysql://hostname:3306/dbname",
  "username": "root",
  "password": "password",
  "enabled": true
thats it now you all set to query mysql from apache drill
select * from plugin_name.table_name;

Sunday, October 11, 2015

Apache Spark : Setup Eclipse (using maven) to Build Spark-Scala project

1. Create maven scala quick start project
mvn archetype:generate -B -DarchetypeArtifactId=scala-quickstart-archetype -DarchetypeVersion=0.8.2  -DgroupId=com.example -DartifactId=spark-project -Dversion=1.0 -Dpackage=com.example.project  -DsourceFolders=scala-only

2. Import spark-project into Eclipse using the maven project import wizard.
3. update pom.xml as follows
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="" xmlns:xsi="" xsi:schemaLocation="">



<!-- this is so we don't end with a compile error in maven-compiler-plugin -->     

4. Add the sample scala program to src/main/scala(I have taken the sample from apache spark website)
package com.example.project

* @author rsingh
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
def main(args: Array[String]) {
val logFile = "/root/test.txt" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

5.Now build your project using maven package command
mvn clean package

6.the above command will create a fat jar in the target directory,run the target jar using the spark-submit
$bin/spark-submit --class com.example.project.SimpleApp --master spark://maprdemo:7077 /root/spark-project-1.0-jar-with-dependencies.jar

Friday, October 2, 2015

Apache Spark : JDBC connectivity with MySQL

ENV: Apache Spark-1.4.1 with spark-shell
scala> def getConnection(){
     | Class.forName("com.mysql.jdbc.Driver").newInstance();
     | DriverManager.getConnection("jdbc:mysql://localhost:3306/test?user=root");
     | }
scala> def convertRS(rs:ResutlSet){
     | (rs.getInt(1),rs.getString(2))
     | }
scala> val result = new JdbcRDD(sc,getConnection,"SELECT * FROM emp WHERE ? > empid AND empid <= ?",0,20,2,maprow=convertRS)
scala> result.collect().toList

Apache Spark : Reading and Writing Sequence Files

Writing a Sequence file:
scala> val data = sc.parallelize(List(("key1", 1), ("Kay2", 2), ("Key3", 2)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at :27
scala> data.saveAsSequenceFile("/tmp/seq-output")

The output can be verified using hadoop ls command
[root@maprdemo sample-data]# hadoop fs -lsr /tmp/seq-output
lsr: DEPRECATED: Please use 'ls -R' instead.
-rwxr-xr-x   1 root root          0 2015-10-02 01:12 /tmp/seq-output/_SUCCESS
-rw-r--r--   1 root root        102 2015-10-02 01:12 /tmp/seq-output/part-00000
-rw-r--r--   1 root root        119 2015-10-02 01:12 /tmp/seq-output/part-00001
[root@maprdemo sample-data]# hadoop fs -text /tmp/seq-output/part-00001
Kay2 2
Key3 2

Reading Sequence file

scala> import
scala> import
val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())}
scala> val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())}
result: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at :29

scala> result.collect
res14: Array[(String, Int)] = Array((Kay2,2), (Key3,2))

Thursday, October 1, 2015

Apache Spark and HBase Integration on MapR Distribution

Env: Spark-1.4.1,HBase 0.98.12 on MapR-5.0

Problems: you can see lots of problem like this
*java.lang.RuntimeException: Error occurred while instantiating com.mapr.fs.hbase.MapRTableMappingRules
*java.lang.ClassNotFoundException: org.apache.hadoop.hbase.client.mapr.BaseTableMappingRules
*error: object HBaseAdmin is not a member of package org.apache.hadoop.hbase.client
*error: object HBaseConfiguration is not a member of package org.apache.hadoop.hbase

to resolve these issue and submit a job from spark-shell you need to change few things.

1. edit spark-defaults.conf and add spark.executor.extraClassPath as follows

spark.executor.extraClassPath /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar

2. start the spark shell with the --driver-class-path as follows
bin/spark-shell --driver-class-path /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar

Utility to create lots of parquet files using hive

Create table in hive
CREATE TABLE partition_table_par(id INT, username string)
 PARTITIONED BY(year STRING, month STRING,day STRING,eventtype STRING,varfunction STRING,varname STRING)

Bash Script to pump the data into the table which will store it in the parquet files

for i in {1..100}
echo $i
year=`expr $i + 1996`
echo $yy
month=`expr $i % 12`
day=`expr $i % 30 `

hive -e 'insert into table partition_table_par PARTITION(year='$yy',month='$mm',day='$dd',eventtype='$et',varfunction='$vf',varname='$v') select 1,'$yy' from test_table limit 1;'
#sleep 1m