def main()

in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkRocketMQDemo.scala [30:80]


  def main(args: Array[String]): Unit = {
    if (args.length < 6) {
      System.err.println(
        """Usage: bin/spark-submit --class com.aliyun.emr.example.spark.streaming.SparkRocketMQDemo examples-1.0-SNAPSHOT-shaded.jar <accessKeyId> <accessKeySecret>
          |         <consumerId> <topic> <subExpression> <parallelism>
          |
          |Arguments:
          |
          |    accessKeyId      Aliyun Access Key ID.
          |    accessKeySecret  Aliyun Key Secret.
          |    consumerId       ONS ConsumerID.
          |    topic            ONS topic.
          |    subExpression    * for all, or some specific tag.
          |    parallelism      The number of receivers.
          |
        """.stripMargin)
      System.exit(1)
    }

    val accessKeyId = args(0)
    val accessKeySecret = args(1)
    val cId = args(2)
    val topic = args(3)
    val subExpression = args(4)
    val parallelism = args(5)

    val numStreams = parallelism.toInt
    val batchInterval = Milliseconds(2000)

    val conf = new SparkConf().setAppName("E-MapReduce Demo 4-1: Spark RocketMQ Demo (Scala)")
    val ssc = new StreamingContext(conf, batchInterval)
    def func: Message => Array[Byte] = msg => msg.getBody
    val onsStreams = (0 until numStreams).map { i =>
      println(s"starting stream $i")
      OnsUtils.createStream(
        ssc,
        cId,
        topic,
        subExpression,
        accessKeyId,
        accessKeySecret,
        StorageLevel.MEMORY_AND_DISK_2,
        func)
    }

    val unionStreams = ssc.union(onsStreams)
    unionStreams.foreachRDD(rdd => println(s"count: ${rdd.count()}"))

    ssc.start()
    ssc.awaitTermination()
  }