Friday, October 23, 2015

Spark Streaming Sample program using scala

mkdir spark-streaming-example
cd spark-streaming-example/
mkdir -p src/main/scala
cd src/main/scala
vim TestStreaming.scala

add following line of code to TestStreaming.scala

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf

object TestStreaming{
        def main(args:Array[String]){
                val conf = new SparkConf().setAppName("Streaming Test")
                val ssc = new StreamingContext(conf,Seconds(30))
                val lines = ssc.socketTextStream("hostname",6002)
                val errorLines = lines.filter(_.contains("error"))
                lines.print
                errorLines.print()
                //errorLines.saveAsTextFiles("errorline.txt")
                println("starting streaming context")
                ssc.start()
                println("awaiting termination")
                ssc.awaitTermination()
                println("App done")
        }
}

cd -
vim build.sbt
name := "Spark Streaming Example"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.4.1","org.apache.spark" %% "spark-streaming" % "1.4.1")

*now run sbt package from project home and it will build a jar inside target/scala-2.10/spark-streaming-example_2.10-1.0.jar

* run this jar using spark-submit
#bin/spark-submit --class TestStreaming target/scala-2.10/spark-streaming-example_2.10-1.0.jar

to test this program open a different terminal and run nc -lk `hostname` 6002 hit enter and type anything on console while will display on the spark console.

No comments: