mkdir spark-streaming-example
cd spark-streaming-example/
mkdir -p src/main/scala
cd src/main/scala
vim TestStreaming.scala
add following line of code to TestStreaming.scala
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
object TestStreaming{
def main(args:Array[String]){
val conf = new SparkConf().setAppName("Streaming Test")
val ssc = new StreamingContext(conf,Seconds(30))
val lines = ssc.socketTextStream("hostname",6002)
val errorLines = lines.filter(_.contains("error"))
lines.print
errorLines.print()
//errorLines.saveAsTextFiles("errorline.txt")
println("starting streaming context")
ssc.start()
println("awaiting termination")
ssc.awaitTermination()
println("App done")
}
}
cd -
vim build.sbt
name := "Spark Streaming Example"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.4.1","org.apache.spark" %% "spark-streaming" % "1.4.1")
*now run sbt package from project home and it will build a jar inside target/scala-2.10/spark-streaming-example_2.10-1.0.jar
* run this jar using spark-submit
#bin/spark-submit --class TestStreaming target/scala-2.10/spark-streaming-example_2.10-1.0.jar
to test this program open a different terminal and run nc -lk `hostname` 6002 hit enter and type anything on console while will display on the spark console.
Friday, October 23, 2015
Spark Streaming Sample program using scala
Saprk-Sql : How to query a csv file.
$/spark-1.4.1/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.1.0
scala> sqlContext
res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@12c57a43
scala> val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "file:///root/emp.csv","header"->"true"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [emp_id: string, emp_name: string, country: string, salary: string]
scala> df.printSchema()
root
|-- emp_id: string (nullable = true)
|-- emp_name: string (nullable = true)
|-- country: string (nullable = true)
|-- salary: string (nullable = true)
scala> df.registerTempTable("emp")
scala> val names = sqlContext.sql("select emp_name from emp")
names: org.apache.spark.sql.DataFrame = [emp_name: string]
scala> names.foreach(println)
[Guy]
[Jonas]
[Hector]
[Byron]
[Owen]
[Zachery]
[Alden]
[Akeem]
Thursday, October 22, 2015
Spark-SQL : how to query json files
scala> sqlContext
res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@330305d3
scala> val df = sqlContext.load("org.apache.spark.sql.json", Map("path" -> "file:///employee.json"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [birth_date: string, department_id: bigint, education_level: string, employee_id: bigint, end_date: string, first_name: string, full_name: string, gender: string, hire_date: string, last_name: string, management_role: string, marital_status: string, position_id: bigint, position_title: string, salary: double, store_id: bigint, supervisor_id: bigint]
scala> df.printSchema()
root
|-- birth_date: string (nullable = true)
|-- department_id: long (nullable = true)
|-- education_level: string (nullable = true)
|-- employee_id: long (nullable = true)
|-- end_date: string (nullable = true)
|-- first_name: string (nullable = true)
|-- full_name: string (nullable = true)
|-- gender: string (nullable = true)
|-- hire_date: string (nullable = true)
|-- last_name: string (nullable = true)
|-- management_role: string (nullable = true)
|-- marital_status: string (nullable = true)
|-- position_id: long (nullable = true)
|-- position_title: string (nullable = true)
|-- salary: double (nullable = true)
|-- store_id: long (nullable = true)
|-- supervisor_id: long (nullable = true)
scala> df.registerTempTable("employee")
scala> val names = sqlContext.sql("select first_name from employee limit 5")
names: org.apache.spark.sql.DataFrame = [first_name: string]
scala> names.foreach(println)
[Sheri]
[Derrick]
[Michael]
[Maya]
[Roberta]
Monday, October 19, 2015
Apache Drill -JDBC Storage Plugin Configuration
To query mysql datasource from apache drill, create a new plugin after accessing the Drill-UI with the following configuration
{
"type": "jdbc",
"driver": "com.mysql.jdbc.Driver",
"url": "jdbc:mysql://hostname:3306/dbname",
"username": "root",
"password": "password",
"enabled": true
}
thats it now you all set to query mysql from apache drillselect * from plugin_name.table_name;
Thursday, October 15, 2015
Sunday, October 11, 2015
Apache Spark : Setup Eclipse (using maven) to Build Spark-Scala project
1. Create maven scala quick start project
2. Import spark-project into Eclipse using the maven project import wizard.
3. update pom.xml as follows
4. Add the sample scala program to src/main/scala(I have taken the sample from apache spark website)
5.Now build your project using maven package command
6.the above command will create a fat jar in the target directory,run the target jar using the spark-submit
mvn archetype:generate -B -DarchetypeGroupId=pl.org.miki -DarchetypeArtifactId=scala-quickstart-archetype -DarchetypeVersion=0.8.2 -DgroupId=com.example -DartifactId=spark-project -Dversion=1.0 -Dpackage=com.example.project -DsourceFolders=scala-only
2. Import spark-project into Eclipse using the maven project import wizard.
3. update pom.xml as follows
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spark-project</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.7</java.version>
<scala.version>2.11.2</scala.version>
<scala.version.tools>2.11</scala.version.tools>
</properties>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<!-- this is so we don't end with a compile error in maven-compiler-plugin -->
<phase>process-sources</phase>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.version.tools}</artifactId>
<version>1.11.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version.tools}</artifactId>
<version>2.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
</dependencies>
</project>
4. Add the sample scala program to src/main/scala(I have taken the sample from apache spark website)
package com.example.project
/**
* @author rsingh
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "/root/test.txt" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
5.Now build your project using maven package command
mvn clean package
6.the above command will create a fat jar in the target directory,run the target jar using the spark-submit
$bin/spark-submit --class com.example.project.SimpleApp --master spark://maprdemo:7077 /root/spark-project-1.0-jar-with-dependencies.jar
Friday, October 2, 2015
Apache Spark : JDBC connectivity with MySQL
ENV: Apache Spark-1.4.1 with spark-shell
scala> def getConnection(){
| Class.forName("com.mysql.jdbc.Driver").newInstance();
| DriverManager.getConnection("jdbc:mysql://localhost:3306/test?user=root");
| }
scala> def convertRS(rs:ResutlSet){
| (rs.getInt(1),rs.getString(2))
| }
scala> val result = new JdbcRDD(sc,getConnection,"SELECT * FROM emp WHERE ? > empid AND empid <= ?",0,20,2,maprow=convertRS)
scala> result.collect().toList
Apache Spark : Reading and Writing Sequence Files
Writing a Sequence file:
The output can be verified using hadoop ls command
Reading Sequence file
scala> val data = sc.parallelize(List(("key1", 1), ("Kay2", 2), ("Key3", 2)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at :27
scala> data.saveAsSequenceFile("/tmp/seq-output")
The output can be verified using hadoop ls command
[root@maprdemo sample-data]# hadoop fs -lsr /tmp/seq-output lsr: DEPRECATED: Please use 'ls -R' instead. -rwxr-xr-x 1 root root 0 2015-10-02 01:12 /tmp/seq-output/_SUCCESS -rw-r--r-- 1 root root 102 2015-10-02 01:12 /tmp/seq-output/part-00000 -rw-r--r-- 1 root root 119 2015-10-02 01:12 /tmp/seq-output/part-00001 [root@maprdemo sample-data]# hadoop fs -text /tmp/seq-output/part-00001 Kay2 2 Key3 2
Reading Sequence file
scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.IntWritable
val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())}
scala> val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())}
result: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at :29
scala> result.collect
res14: Array[(String, Int)] = Array((Kay2,2), (Key3,2))
Thursday, October 1, 2015
Apache Spark and HBase Integration on MapR Distribution
Env: Spark-1.4.1,HBase 0.98.12 on MapR-5.0
Problems: you can see lots of problem like this
*java.lang.RuntimeException: Error occurred while instantiating com.mapr.fs.hbase.MapRTableMappingRules
*java.lang.ClassNotFoundException: org.apache.hadoop.hbase.client.mapr.BaseTableMappingRules
*error: object HBaseAdmin is not a member of package org.apache.hadoop.hbase.client
*error: object HBaseConfiguration is not a member of package org.apache.hadoop.hbase
to resolve these issue and submit a job from spark-shell you need to change few things.
1. edit spark-defaults.conf and add spark.executor.extraClassPath as follows
spark.executor.extraClassPath /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar
2. start the spark shell with the --driver-class-path as follows
bin/spark-shell --driver-class-path /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar
Problems: you can see lots of problem like this
*java.lang.RuntimeException: Error occurred while instantiating com.mapr.fs.hbase.MapRTableMappingRules
*java.lang.ClassNotFoundException: org.apache.hadoop.hbase.client.mapr.BaseTableMappingRules
*error: object HBaseAdmin is not a member of package org.apache.hadoop.hbase.client
*error: object HBaseConfiguration is not a member of package org.apache.hadoop.hbase
to resolve these issue and submit a job from spark-shell you need to change few things.
1. edit spark-defaults.conf and add spark.executor.extraClassPath as follows
spark.executor.extraClassPath /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar
2. start the spark shell with the --driver-class-path as follows
bin/spark-shell --driver-class-path /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar
Utility to create lots of parquet files using hive
Create table in hive
Bash Script to pump the data into the table which will store it in the parquet files
CREATE TABLE partition_table_par(id INT, username string) PARTITIONED BY(year STRING, month STRING,day STRING,eventtype STRING,varfunction STRING,varname STRING) STORED AS PARQUET;
Bash Script to pump the data into the table which will store it in the parquet files
#!/bin/bash
for i in {1..100}
do
echo $i
year=`expr $i + 1996`
yy=\'$year\'
echo $yy
month=`expr $i % 12`
mm=\'$month\'
day=`expr $i % 30 `
dd=\'$day\'
eventtype=eventtype$i
et=\'$eventtype\'
varfunction=varfunction$i
vf=\'$varfunction\'
varname=varname$i
v=\'$varname\'
hive -e 'insert into table partition_table_par PARTITION(year='$yy',month='$mm',day='$dd',eventtype='$et',varfunction='$vf',varname='$v') select 1,'$yy' from test_table limit 1;'
#sleep 1m
done
Subscribe to:
Comments (Atom)