in src/main/scala/com/aliyun/emr/example/spark/streaming/DtsSample.scala [28:62]
def main(args: Array[String]): Unit = {
if (args.length < 4) {
System.err.println(s"""
|Usage: DtsSample <accessKeyId> <accessKeySecret> <guid> <usePublicIp> <interval-mills>
| <accessKeyId> Aliyun Access Key ID.
| <accessKeySecret> Aliyun Access Key Secret.
| <guid> Aliyun DTS guid name.
| <usePublicIp> Use public Ip to access DTS or not.
| <interval-mills> The time interval at which streaming data will be divided into batches.
""".stripMargin)
System.exit(1)
}
val Array(accessKeyId, accessKeySecret, guid, usePublicIp, interval) = args
val sparkConf = new SparkConf().setAppName("DtsSample")
val ssc: StreamingContext = new StreamingContext(sparkConf, Milliseconds(interval.toInt))
def func: ClusterMessage => String = msg => msg.getRecord.toString
val dtsStream = DtsUtils.createStream(
ssc,
accessKeyId,
accessKeySecret,
guid,
func,
StorageLevel.MEMORY_AND_DISK_2,
usePublicIp.toBoolean)
dtsStream.foreachRDD(rdd => {
rdd.collect().foreach(println)
})
ssc.start()
ssc.awaitTermination()
}