def main()

in src/main/scala/com/aliyun/emr/example/spark/streaming/DtsSample.scala [28:62]


  def main(args: Array[String]): Unit = {
    if (args.length < 4) {
      System.err.println(s"""
                            |Usage: DtsSample <accessKeyId> <accessKeySecret> <guid> <usePublicIp> <interval-mills>
                            |  <accessKeyId>      Aliyun Access Key ID.
                            |  <accessKeySecret>  Aliyun Access Key Secret.
                            |  <guid>             Aliyun DTS guid name.
                            |  <usePublicIp>      Use public Ip to access DTS or not.
                            |  <interval-mills>   The time interval at which streaming data will be divided into batches.
        """.stripMargin)
      System.exit(1)
    }

    val Array(accessKeyId, accessKeySecret, guid, usePublicIp, interval) = args
    val sparkConf = new SparkConf().setAppName("DtsSample")
    val ssc: StreamingContext = new StreamingContext(sparkConf, Milliseconds(interval.toInt))

    def func: ClusterMessage => String = msg => msg.getRecord.toString

    val dtsStream = DtsUtils.createStream(
      ssc,
      accessKeyId,
      accessKeySecret,
      guid,
      func,
      StorageLevel.MEMORY_AND_DISK_2,
      usePublicIp.toBoolean)

    dtsStream.foreachRDD(rdd => {
      rdd.collect().foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }