def main()

in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkMNSDemo.scala [26:51]


  def main(args: Array[String]): Unit = {
    if (args.length < 4) {
      System.err.println(
        """Usage: spark-submit --class SparkMNSDemo examples-1.0-SNAPSHOT-shaded.jar <queueName> <accessKeyId> <accessKeySecret> <endpoint>""".stripMargin)
      System.exit(1)
    }
    val queueName = args(0)
    val accessKeyId = args(1)
    val accessKeySecret = args(2)
    val endpoint = args(3)

    val conf = new SparkConf().setAppName("E-MapReduce Demo 8-1: Spark MNS Demo (Scala)").setMaster("local[4]")
    conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
    conf.set("spark.hadoop.mapreduce.job.run-local", "true")
    val batchInterval = Seconds(10)
    val ssc = new StreamingContext(conf, batchInterval)

    val mnsStream = MnsUtils.createPullingStreamAsBytes(ssc, queueName, accessKeyId, accessKeySecret, endpoint,
      StorageLevel.MEMORY_ONLY)
    mnsStream.foreachRDD( rdd => {
      rdd.collect().foreach(e => println(new String(e)))
    })

    ssc.start()
    ssc.awaitTermination()
  }