in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkDatahubDemo.scala [29:96]
def main(args: Array[String]): Unit = {
if (args.length < 7) {
// scalastyle:off
System.err.println(
"""Usage: SparkDatahubDemo <project> <topic> <subscribe Id> <access key id>
| <access key secret> <endpoint> <batch interval seconds> [<shard Id>]
""".stripMargin)
// scalastyle:on
System.exit(1)
}
var isShardDefined = false
if (args.length == 8) {
isShardDefined = true
}
val project = args(0)
val topic = args(1)
val subId = args(2)
val accessKeyId = args(3)
val accessKeySecret = args(4)
val endpoint = args(5)
val batchInterval = Milliseconds(args(6).toInt * 1000)
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setMaster("local[4]").setAppName("E-MapReduce Demo 11: Spark DataHub Demo (Scala)")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.mapreduce.job.run-local", "true")
val ssc = new StreamingContext(conf, batchInterval)
var datahubStream: DStream[Array[Byte]] = null
if (isShardDefined) {
val shardId = args(7)
datahubStream = DatahubUtils.createStream(
ssc,
project,
topic,
subId,
accessKeyId,
accessKeySecret,
endpoint,
shardId,
read(_),
StorageLevel.MEMORY_AND_DISK)
} else {
datahubStream = DatahubUtils.createStream(
ssc,
project,
topic,
subId,
accessKeyId,
accessKeySecret,
endpoint,
read(_),
StorageLevel.MEMORY_AND_DISK)
}
// scalastyle:off
datahubStream.foreachRDD(rdd => println(s"rdd.count(): ${rdd.count()}"))
// scalastyle:on
ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
ssc
}
val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
ssc.start()
ssc.awaitTermination()
}