Friday, October 23, 2015

Spark Streaming Sample program using scala

mkdir spark-streaming-example
cd spark-streaming-example/
mkdir -p src/main/scala
cd src/main/scala
vim TestStreaming.scala

add following line of code to TestStreaming.scala

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf

object TestStreaming{
        def main(args:Array[String]){
                val conf = new SparkConf().setAppName("Streaming Test")
                val ssc = new StreamingContext(conf,Seconds(30))
                val lines = ssc.socketTextStream("hostname",6002)
                val errorLines = lines.filter(_.contains("error"))
                lines.print
                errorLines.print()
                //errorLines.saveAsTextFiles("errorline.txt")
                println("starting streaming context")
                ssc.start()
                println("awaiting termination")
                ssc.awaitTermination()
                println("App done")
        }
}

cd -
vim build.sbt
name := "Spark Streaming Example"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.4.1","org.apache.spark" %% "spark-streaming" % "1.4.1")

*now run sbt package from project home and it will build a jar inside target/scala-2.10/spark-streaming-example_2.10-1.0.jar

* run this jar using spark-submit
#bin/spark-submit --class TestStreaming target/scala-2.10/spark-streaming-example_2.10-1.0.jar

to test this program open a different terminal and run nc -lk `hostname` 6002 hit enter and type anything on console while will display on the spark console.

Saprk-Sql : How to query a csv file.

$/spark-1.4.1/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.1.0
scala> sqlContext
res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@12c57a43

scala> val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "file:///root/emp.csv","header"->"true"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [emp_id: string, emp_name: string, country: string, salary: string]

scala>   df.printSchema()
root
 |-- emp_id: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- salary: string (nullable = true)


scala> df.registerTempTable("emp")

scala> val names = sqlContext.sql("select emp_name from emp")
names: org.apache.spark.sql.DataFrame = [emp_name: string]

scala> names.foreach(println)
[Guy]
[Jonas]
[Hector]
[Byron]
[Owen]
[Zachery]
[Alden]
[Akeem]

Thursday, October 22, 2015

Spark-SQL : how to query json files

scala> sqlContext
res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@330305d3

scala> val df = sqlContext.load("org.apache.spark.sql.json", Map("path" -> "file:///employee.json"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [birth_date: string, department_id: bigint, education_level: string, employee_id: bigint, end_date: string, first_name: string, full_name: string, gender: string, hire_date: string, last_name: string, management_role: string, marital_status: string, position_id: bigint, position_title: string, salary: double, store_id: bigint, supervisor_id: bigint]

scala>  df.printSchema()
root
 |-- birth_date: string (nullable = true)
 |-- department_id: long (nullable = true)
 |-- education_level: string (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- end_date: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- management_role: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- position_id: long (nullable = true)
 |-- position_title: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- store_id: long (nullable = true)
 |-- supervisor_id: long (nullable = true)


scala> df.registerTempTable("employee")
scala> val names = sqlContext.sql("select first_name from employee limit 5")
names: org.apache.spark.sql.DataFrame = [first_name: string]
scala> names.foreach(println)
[Sheri]
[Derrick]
[Michael]
[Maya]
[Roberta]


Monday, October 19, 2015

Apache Drill -JDBC Storage Plugin Configuration

To query mysql datasource from apache drill, create a new plugin after accessing the Drill-UI with the following configuration
{
  "type": "jdbc",
  "driver": "com.mysql.jdbc.Driver",
  "url": "jdbc:mysql://hostname:3306/dbname",
  "username": "root",
  "password": "password",
  "enabled": true
}
thats it now you all set to query mysql from apache drill
select * from plugin_name.table_name;

Sunday, October 11, 2015

Apache Spark : Setup Eclipse (using maven) to Build Spark-Scala project

1. Create maven scala quick start project
mvn archetype:generate -B  -DarchetypeGroupId=pl.org.miki -DarchetypeArtifactId=scala-quickstart-archetype -DarchetypeVersion=0.8.2  -DgroupId=com.example -DartifactId=spark-project -Dversion=1.0 -Dpackage=com.example.project  -DsourceFolders=scala-only


2. Import spark-project into Eclipse using the maven project import wizard.
3. update pom.xml as follows
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.example</groupId>
<artifactId>spark-project</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.7</java.version>
<scala.version>2.11.2</scala.version>
<scala.version.tools>2.11</scala.version.tools>
</properties>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<!-- this is so we don't end with a compile error in maven-compiler-plugin -->     
<phase>process-sources</phase>      
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.version.tools}</artifactId>
<version>1.11.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version.tools}</artifactId>
<version>2.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
</dependencies>
</project>

4. Add the sample scala program to src/main/scala(I have taken the sample from apache spark website)
package com.example.project

/**
* @author rsingh
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
def main(args: Array[String]) {
val logFile = "/root/test.txt" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}

5.Now build your project using maven package command
mvn clean package

6.the above command will create a fat jar in the target directory,run the target jar using the spark-submit
$bin/spark-submit --class com.example.project.SimpleApp --master spark://maprdemo:7077 /root/spark-project-1.0-jar-with-dependencies.jar

Friday, October 2, 2015

Apache Spark : JDBC connectivity with MySQL

ENV: Apache Spark-1.4.1 with spark-shell
scala> def getConnection(){
     | Class.forName("com.mysql.jdbc.Driver").newInstance();
     | DriverManager.getConnection("jdbc:mysql://localhost:3306/test?user=root");
     | }
scala> def convertRS(rs:ResutlSet){
     | (rs.getInt(1),rs.getString(2))
     | }
scala> val result = new JdbcRDD(sc,getConnection,"SELECT * FROM emp WHERE ? > empid AND empid <= ?",0,20,2,maprow=convertRS)
scala> result.collect().toList

Apache Spark : Reading and Writing Sequence Files

Writing a Sequence file:
scala> val data = sc.parallelize(List(("key1", 1), ("Kay2", 2), ("Key3", 2)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at :27
scala> data.saveAsSequenceFile("/tmp/seq-output")

The output can be verified using hadoop ls command
[root@maprdemo sample-data]# hadoop fs -lsr /tmp/seq-output
lsr: DEPRECATED: Please use 'ls -R' instead.
-rwxr-xr-x   1 root root          0 2015-10-02 01:12 /tmp/seq-output/_SUCCESS
-rw-r--r--   1 root root        102 2015-10-02 01:12 /tmp/seq-output/part-00000
-rw-r--r--   1 root root        119 2015-10-02 01:12 /tmp/seq-output/part-00001
[root@maprdemo sample-data]# hadoop fs -text /tmp/seq-output/part-00001
Kay2 2
Key3 2

Reading Sequence file

scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.IntWritable
val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())}
scala> val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())}
result: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at :29

scala> result.collect
res14: Array[(String, Int)] = Array((Kay2,2), (Key3,2))

Thursday, October 1, 2015

Apache Spark and HBase Integration on MapR Distribution

Env: Spark-1.4.1,HBase 0.98.12 on MapR-5.0

Problems: you can see lots of problem like this
*java.lang.RuntimeException: Error occurred while instantiating com.mapr.fs.hbase.MapRTableMappingRules
*java.lang.ClassNotFoundException: org.apache.hadoop.hbase.client.mapr.BaseTableMappingRules
*error: object HBaseAdmin is not a member of package org.apache.hadoop.hbase.client
*error: object HBaseConfiguration is not a member of package org.apache.hadoop.hbase

to resolve these issue and submit a job from spark-shell you need to change few things.

1. edit spark-defaults.conf and add spark.executor.extraClassPath as follows

spark.executor.extraClassPath /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar

2. start the spark shell with the --driver-class-path as follows
bin/spark-shell --driver-class-path /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar



Utility to create lots of parquet files using hive

Create table in hive
CREATE TABLE partition_table_par(id INT, username string)
 PARTITIONED BY(year STRING, month STRING,day STRING,eventtype STRING,varfunction STRING,varname STRING)
 STORED AS PARQUET;

Bash Script to pump the data into the table which will store it in the parquet files
#!/bin/bash

for i in {1..100}
do
echo $i
year=`expr $i + 1996`
yy=\'$year\'
echo $yy
month=`expr $i % 12`
mm=\'$month\'
day=`expr $i % 30 `
dd=\'$day\'
eventtype=eventtype$i
et=\'$eventtype\'
varfunction=varfunction$i
vf=\'$varfunction\'
varname=varname$i
v=\'$varname\'


hive -e 'insert into table partition_table_par PARTITION(year='$yy',month='$mm',day='$dd',eventtype='$et',varfunction='$vf',varname='$v') select 1,'$yy' from test_table limit 1;'
#sleep 1m
done