mkdir spark-streaming-example cd spark-streaming-example/ mkdir -p src/main/scala cd src/main/scala vim TestStreaming.scala add following line of code to TestStreaming.scala import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds import org.apache.spark.SparkConf object TestStreaming{ def main(args:Array[String]){ val conf = new SparkConf().setAppName("Streaming Test") val ssc = new StreamingContext(conf,Seconds(30)) val lines = ssc.socketTextStream("hostname",6002) val errorLines = lines.filter(_.contains("error")) lines.print errorLines.print() //errorLines.saveAsTextFiles("errorline.txt") println("starting streaming context") ssc.start() println("awaiting termination") ssc.awaitTermination() println("App done") } } cd - vim build.sbt name := "Spark Streaming Example" version := "1.0" scalaVersion := "2.10.4" libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.4.1","org.apache.spark" %% "spark-streaming" % "1.4.1") *now run sbt package from project home and it will build a jar inside target/scala-2.10/spark-streaming-example_2.10-1.0.jar * run this jar using spark-submit #bin/spark-submit --class TestStreaming target/scala-2.10/spark-streaming-example_2.10-1.0.jar to test this program open a different terminal and run nc -lk `hostname` 6002 hit enter and type anything on console while will display on the spark console.
Friday, October 23, 2015
Spark Streaming Sample program using scala
Saprk-Sql : How to query a csv file.
$/spark-1.4.1/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.1.0 scala> sqlContext res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@12c57a43 scala> val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "file:///root/emp.csv","header"->"true")) warning: there were 1 deprecation warning(s); re-run with -deprecation for details df: org.apache.spark.sql.DataFrame = [emp_id: string, emp_name: string, country: string, salary: string] scala> df.printSchema() root |-- emp_id: string (nullable = true) |-- emp_name: string (nullable = true) |-- country: string (nullable = true) |-- salary: string (nullable = true) scala> df.registerTempTable("emp") scala> val names = sqlContext.sql("select emp_name from emp") names: org.apache.spark.sql.DataFrame = [emp_name: string] scala> names.foreach(println) [Guy] [Jonas] [Hector] [Byron] [Owen] [Zachery] [Alden] [Akeem]
Thursday, October 22, 2015
Spark-SQL : how to query json files
scala> sqlContext res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@330305d3 scala> val df = sqlContext.load("org.apache.spark.sql.json", Map("path" -> "file:///employee.json")) warning: there were 1 deprecation warning(s); re-run with -deprecation for details df: org.apache.spark.sql.DataFrame = [birth_date: string, department_id: bigint, education_level: string, employee_id: bigint, end_date: string, first_name: string, full_name: string, gender: string, hire_date: string, last_name: string, management_role: string, marital_status: string, position_id: bigint, position_title: string, salary: double, store_id: bigint, supervisor_id: bigint] scala> df.printSchema() root |-- birth_date: string (nullable = true) |-- department_id: long (nullable = true) |-- education_level: string (nullable = true) |-- employee_id: long (nullable = true) |-- end_date: string (nullable = true) |-- first_name: string (nullable = true) |-- full_name: string (nullable = true) |-- gender: string (nullable = true) |-- hire_date: string (nullable = true) |-- last_name: string (nullable = true) |-- management_role: string (nullable = true) |-- marital_status: string (nullable = true) |-- position_id: long (nullable = true) |-- position_title: string (nullable = true) |-- salary: double (nullable = true) |-- store_id: long (nullable = true) |-- supervisor_id: long (nullable = true) scala> df.registerTempTable("employee") scala> val names = sqlContext.sql("select first_name from employee limit 5") names: org.apache.spark.sql.DataFrame = [first_name: string] scala> names.foreach(println) [Sheri] [Derrick] [Michael] [Maya] [Roberta]
Monday, October 19, 2015
Apache Drill -JDBC Storage Plugin Configuration
To query mysql datasource from apache drill, create a new plugin after accessing the Drill-UI with the following configuration
{ "type": "jdbc", "driver": "com.mysql.jdbc.Driver", "url": "jdbc:mysql://hostname:3306/dbname", "username": "root", "password": "password", "enabled": true }thats it now you all set to query mysql from apache drill
select * from plugin_name.table_name;
Thursday, October 15, 2015
Sunday, October 11, 2015
Apache Spark : Setup Eclipse (using maven) to Build Spark-Scala project
1. Create maven scala quick start project
2. Import spark-project into Eclipse using the maven project import wizard.
3. update pom.xml as follows
4. Add the sample scala program to src/main/scala(I have taken the sample from apache spark website)
5.Now build your project using maven package command
6.the above command will create a fat jar in the target directory,run the target jar using the spark-submit
mvn archetype:generate -B -DarchetypeGroupId=pl.org.miki -DarchetypeArtifactId=scala-quickstart-archetype -DarchetypeVersion=0.8.2 -DgroupId=com.example -DartifactId=spark-project -Dversion=1.0 -Dpackage=com.example.project -DsourceFolders=scala-only
2. Import spark-project into Eclipse using the maven project import wizard.
3. update pom.xml as follows
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>spark-project</artifactId> <version>1.0</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.7</java.version> <scala.version>2.11.2</scala.version> <scala.version.tools>2.11</scala.version.tools> </properties> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <!-- this is so we don't end with a compile error in maven-compiler-plugin --> <phase>process-sources</phase> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.4.1</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> <artifactId>scalacheck_${scala.version.tools}</artifactId> <version>1.11.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version.tools}</artifactId> <version>2.2.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.4.1</version> </dependency> </dependencies> </project>
4. Add the sample scala program to src/main/scala(I have taken the sample from apache spark website)
package com.example.project /** * @author rsingh */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "/root/test.txt" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
5.Now build your project using maven package command
mvn clean package
6.the above command will create a fat jar in the target directory,run the target jar using the spark-submit
$bin/spark-submit --class com.example.project.SimpleApp --master spark://maprdemo:7077 /root/spark-project-1.0-jar-with-dependencies.jar
Friday, October 2, 2015
Apache Spark : JDBC connectivity with MySQL
ENV: Apache Spark-1.4.1 with spark-shell
scala> def getConnection(){ | Class.forName("com.mysql.jdbc.Driver").newInstance(); | DriverManager.getConnection("jdbc:mysql://localhost:3306/test?user=root"); | } scala> def convertRS(rs:ResutlSet){ | (rs.getInt(1),rs.getString(2)) | } scala> val result = new JdbcRDD(sc,getConnection,"SELECT * FROM emp WHERE ? > empid AND empid <= ?",0,20,2,maprow=convertRS) scala> result.collect().toList
Apache Spark : Reading and Writing Sequence Files
Writing a Sequence file:
The output can be verified using hadoop ls command
Reading Sequence file
scala> val data = sc.parallelize(List(("key1", 1), ("Kay2", 2), ("Key3", 2))) data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at:27 scala> data.saveAsSequenceFile("/tmp/seq-output")
The output can be verified using hadoop ls command
[root@maprdemo sample-data]# hadoop fs -lsr /tmp/seq-output lsr: DEPRECATED: Please use 'ls -R' instead. -rwxr-xr-x 1 root root 0 2015-10-02 01:12 /tmp/seq-output/_SUCCESS -rw-r--r-- 1 root root 102 2015-10-02 01:12 /tmp/seq-output/part-00000 -rw-r--r-- 1 root root 119 2015-10-02 01:12 /tmp/seq-output/part-00001 [root@maprdemo sample-data]# hadoop fs -text /tmp/seq-output/part-00001 Kay2 2 Key3 2
Reading Sequence file
scala> import org.apache.hadoop.io.Text import org.apache.hadoop.io.Text scala> import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.IntWritable val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())} scala> val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())} result: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at:29 scala> result.collect res14: Array[(String, Int)] = Array((Kay2,2), (Key3,2))
Thursday, October 1, 2015
Apache Spark and HBase Integration on MapR Distribution
Env: Spark-1.4.1,HBase 0.98.12 on MapR-5.0
Problems: you can see lots of problem like this
*java.lang.RuntimeException: Error occurred while instantiating com.mapr.fs.hbase.MapRTableMappingRules
*java.lang.ClassNotFoundException: org.apache.hadoop.hbase.client.mapr.BaseTableMappingRules
*error: object HBaseAdmin is not a member of package org.apache.hadoop.hbase.client
*error: object HBaseConfiguration is not a member of package org.apache.hadoop.hbase
to resolve these issue and submit a job from spark-shell you need to change few things.
1. edit spark-defaults.conf and add spark.executor.extraClassPath as follows
spark.executor.extraClassPath /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar
2. start the spark shell with the --driver-class-path as follows
bin/spark-shell --driver-class-path /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar
Problems: you can see lots of problem like this
*java.lang.RuntimeException: Error occurred while instantiating com.mapr.fs.hbase.MapRTableMappingRules
*java.lang.ClassNotFoundException: org.apache.hadoop.hbase.client.mapr.BaseTableMappingRules
*error: object HBaseAdmin is not a member of package org.apache.hadoop.hbase.client
*error: object HBaseConfiguration is not a member of package org.apache.hadoop.hbase
to resolve these issue and submit a job from spark-shell you need to change few things.
1. edit spark-defaults.conf and add spark.executor.extraClassPath as follows
spark.executor.extraClassPath /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar
2. start the spark shell with the --driver-class-path as follows
bin/spark-shell --driver-class-path /opt/mapr/hbase/hbase-0.98.12/lib/hbase-client-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-common-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-hadoop2-compat-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-it-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-protocol-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/hbase-server-0.98.12-mapr-1506.jar:/opt/mapr/hbase/hbase-0.98.12/lib/htrace-core-2.04.jar:/opt/mapr/lib/mapr-hbase-5.0.0-mapr.jar
Utility to create lots of parquet files using hive
Create table in hive
Bash Script to pump the data into the table which will store it in the parquet files
CREATE TABLE partition_table_par(id INT, username string) PARTITIONED BY(year STRING, month STRING,day STRING,eventtype STRING,varfunction STRING,varname STRING) STORED AS PARQUET;
Bash Script to pump the data into the table which will store it in the parquet files
#!/bin/bash for i in {1..100} do echo $i year=`expr $i + 1996` yy=\'$year\' echo $yy month=`expr $i % 12` mm=\'$month\' day=`expr $i % 30 ` dd=\'$day\' eventtype=eventtype$i et=\'$eventtype\' varfunction=varfunction$i vf=\'$varfunction\' varname=varname$i v=\'$varname\' hive -e 'insert into table partition_table_par PARTITION(year='$yy',month='$mm',day='$dd',eventtype='$et',varfunction='$vf',varname='$v') select 1,'$yy' from test_table limit 1;' #sleep 1m done
Subscribe to:
Posts (Atom)