in src/main/scala/com/aliyun/emr/example/spark/streaming/DirectSparkSLSDemo.scala [26:75]
def main(args: Array[String]): Unit = {
if (args.length < 7) {
System.err.println(
"""Usage: DirectSparkSLSDemo <sls project> <sls logstore> <loghub group name> <sls endpoint>
| <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
""".stripMargin)
System.exit(1)
}
val loghubProject = args(0)
val logStore = args(1)
val loghubGroupName = args(2)
val endpoint = args(3)
val accessKeyId = args(4)
val accessKeySecret = args(5)
val batchInterval = Milliseconds(args(6).toInt * 1000)
val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("E-MapReduce Demo 6-2: Spark SLS Demo (Scala) (Direct API)")
val ssc = new StreamingContext(conf, batchInterval)
val zkParas = Map("zookeeper.connect" -> zkAddress,
"enable.auto.commit" -> "false")
val loghubStream = LoghubUtils.createDirectStream(
ssc,
loghubProject,
logStore,
loghubGroupName,
accessKeyId,
accessKeySecret,
endpoint,
zkParas,
LogHubCursorPosition.END_CURSOR)
loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
println(s"count by key: ${rdd.map(s => {
s.sorted
(s.length, s)
}).countByKey().size}")
loghubStream.asInstanceOf[DirectLoghubInputDStream].commitAsync()
})
ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
ssc
}
val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
ssc.start()
ssc.awaitTermination()
}