in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkSLSDemo.scala [27:66]
def main(args: Array[String]): Unit = {
if (args.length < 7) {
System.err.println(
"""Usage: SparkSLSDemo <sls project> <sls logstore> <loghub group name> <sls endpoint>
| <access key id> <access key secret> <batch interval seconds>
""".stripMargin)
System.exit(1)
}
val loghubProject = args(0)
val logStore = args(1)
val loghubGroupName = args(2)
val endpoint = args(3)
val accessKeyId = args(4)
val accessKeySecret = args(5)
val batchInterval = Milliseconds(args(6).toInt * 1000)
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("E-MapReduce Demo 6-1: Spark SLS Demo (Scala)")
val ssc = new StreamingContext(conf, batchInterval)
val loghubStream = LoghubUtils.createStream(
ssc,
loghubProject,
logStore,
loghubGroupName,
endpoint,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK)
loghubStream.foreachRDD(rdd => println(s"rdd.count(): ${rdd.count()}"))
ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
ssc
}
val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
ssc.start()
ssc.awaitTermination()
}