in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkMNSDemo.scala [26:51]
def main(args: Array[String]): Unit = {
if (args.length < 4) {
System.err.println(
"""Usage: spark-submit --class SparkMNSDemo examples-1.0-SNAPSHOT-shaded.jar <queueName> <accessKeyId> <accessKeySecret> <endpoint>""".stripMargin)
System.exit(1)
}
val queueName = args(0)
val accessKeyId = args(1)
val accessKeySecret = args(2)
val endpoint = args(3)
val conf = new SparkConf().setAppName("E-MapReduce Demo 8-1: Spark MNS Demo (Scala)").setMaster("local[4]")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.mapreduce.job.run-local", "true")
val batchInterval = Seconds(10)
val ssc = new StreamingContext(conf, batchInterval)
val mnsStream = MnsUtils.createPullingStreamAsBytes(ssc, queueName, accessKeyId, accessKeySecret, endpoint,
StorageLevel.MEMORY_ONLY)
mnsStream.foreachRDD( rdd => {
rdd.collect().foreach(e => println(new String(e)))
})
ssc.start()
ssc.awaitTermination()
}