in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkRocketMQDemo.scala [30:80]
def main(args: Array[String]): Unit = {
if (args.length < 6) {
System.err.println(
"""Usage: bin/spark-submit --class com.aliyun.emr.example.spark.streaming.SparkRocketMQDemo examples-1.0-SNAPSHOT-shaded.jar <accessKeyId> <accessKeySecret>
| <consumerId> <topic> <subExpression> <parallelism>
|
|Arguments:
|
| accessKeyId Aliyun Access Key ID.
| accessKeySecret Aliyun Key Secret.
| consumerId ONS ConsumerID.
| topic ONS topic.
| subExpression * for all, or some specific tag.
| parallelism The number of receivers.
|
""".stripMargin)
System.exit(1)
}
val accessKeyId = args(0)
val accessKeySecret = args(1)
val cId = args(2)
val topic = args(3)
val subExpression = args(4)
val parallelism = args(5)
val numStreams = parallelism.toInt
val batchInterval = Milliseconds(2000)
val conf = new SparkConf().setAppName("E-MapReduce Demo 4-1: Spark RocketMQ Demo (Scala)")
val ssc = new StreamingContext(conf, batchInterval)
def func: Message => Array[Byte] = msg => msg.getBody
val onsStreams = (0 until numStreams).map { i =>
println(s"starting stream $i")
OnsUtils.createStream(
ssc,
cId,
topic,
subExpression,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK_2,
func)
}
val unionStreams = ssc.union(onsStreams)
unionStreams.foreachRDD(rdd => println(s"count: ${rdd.count()}"))
ssc.start()
ssc.awaitTermination()
}