import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object SparkKafkaConsumer2 {
def main(args: Array[String]) {
// TODO: Print out line in log of authenticated user
val Array(brokerlist, group, topics, numThreads) = args
var kafkaParams = Map(
"bootstrap.servers"->"rks253secure.hdp.local:6667",
"key.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer",
"group.id"-> "test",
"security.protocol"->"PLAINTEXTSASL",
"auto.offset.reset"-> "smallest"
)
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(100))
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, Set(topics))
// TODO: change to be a variable
kafkaStream.saveAsTextFiles("/tmp/streaming_output")
ssc.start()
ssc.awaitTermination()
}
}
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
object Kafka_Word_Count {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("KafkaWordCount")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.driver.allowMultipleContexts", "true")
val ssc = new StreamingContext(conf, Seconds(3))
val groupID = "test"
val numThreads = "2"
val topic = "kafkatopic"
val topicMap = topic.split(",").map((_, numThreads.toInt)).toMap
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "rks253secure.hdp.local:2181",
"group.id" -> groupID,
"zookeeper.connection.timeout.ms" -> "10000",
"security.protocol"->"PLAINTEXTSASL"
)
val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
lines.print()
ssc.start()
ssc.awaitTermination()
}
}
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="kafka/rks253secure.hdp.local@EXAMPLE.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="kafka/rks253secure.hdp.local@EXAMPLE.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
serviceName="kafka"
principal="kafka/rks253secure.hdp.local@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="kafka/rks253secure.hdp.local@EXAMPLE.COM";
};
kinit from kafka user..
spark-submit --files /etc/kafka/conf/kafka_jaas.conf,/etc/security/keytabs/kafka.service.keytab --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/etc/kafka/conf/kafka_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/etc/kafka/conf/kafka_jaas.conf" --class SparkKafkaConsumer2 --master local[2]
/tmp/SparkKafkaSampleApp-1.0-SNAPSHOT-jar-with-dependencies.jar "rks253secure.hdp.local:6667" test kafkatopic 1
No comments:
Post a Comment