def main()

in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkKafkaDemo.scala [26:68]


  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(s"""
                            |Usage: SparkKafkaDemo <brokers> <topics>
                            |  <brokers> is a list of one or more Kafka brokers
                            |  <topics> is a list of one or more kafka topics to consume from
                            |  <interval>
        """.stripMargin)
      System.exit(1)
    }
    val Array(brokers, topics, interval) = args

    val sparkConf = new SparkConf().setAppName("E-MapReduce Demo 9: Spark Kafka Demo (Scala)")
    val ssc = new StreamingContext(sparkConf, Seconds(interval.toInt))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "mugen1",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "security.protocol" -> "SASL_PLAINTEXT",
      "sasl.mechanism" -> "GSSAPI",
      "sasl.kerberos.service.name" -> "kafka"
    )

    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(topics), kafkaParams)
    )

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }