in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkKafkaDemo.scala [26:68]
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: SparkKafkaDemo <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <interval>
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics, interval) = args
val sparkConf = new SparkConf().setAppName("E-MapReduce Demo 9: Spark Kafka Demo (Scala)")
val ssc = new StreamingContext(sparkConf, Seconds(interval.toInt))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "mugen1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.mechanism" -> "GSSAPI",
"sasl.kerberos.service.name" -> "kafka"
)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(topics), kafkaParams)
)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}