in src/main/scala/com/aliyun/emr/example/spark/streaming/benchmark/metrics/KafkaMetrics.scala [9:32]
def main(args: Array[String]): Unit = {
val config = loadConfig(args(0))
val ssc = new StreamingContext(new SparkConf().setAppName("KafkaMetrics"), Seconds(config.getProperty("metric.duration.second").toLong))
val kafkaParam = Map[String, Object] (
"bootstrap.servers" -> config.getProperty("result.broker.list"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> config.getProperty("metric.group.id"),
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val messages = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(config.getProperty("result.topic")), kafkaParam))
val outputPath = config.getProperty("filename.prefix") + config.getProperty("benchmark.app.name") + "/kafka-"
messages.map(_.value()).saveAsTextFiles(outputPath)
ssc.start()
ssc.awaitTermination()
}