def main()

in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkDatahubDemo.scala [29:96]


  def main(args: Array[String]): Unit = {
    if (args.length < 7) {
      // scalastyle:off
      System.err.println(
        """Usage: SparkDatahubDemo <project> <topic> <subscribe Id> <access key id>
          |         <access key secret> <endpoint> <batch interval seconds> [<shard Id>]
        """.stripMargin)
      // scalastyle:on
      System.exit(1)
    }

    var isShardDefined = false
    if (args.length == 8) {
      isShardDefined = true
    }

    val project = args(0)
    val topic = args(1)
    val subId = args(2)
    val accessKeyId = args(3)
    val accessKeySecret = args(4)
    val endpoint = args(5)
    val batchInterval = Milliseconds(args(6).toInt * 1000)

    def functionToCreateContext(): StreamingContext = {
      val conf = new SparkConf().setMaster("local[4]").setAppName("E-MapReduce Demo 11: Spark DataHub Demo (Scala)")
      conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
      conf.set("spark.hadoop.mapreduce.job.run-local", "true")
      val ssc = new StreamingContext(conf, batchInterval)
      var datahubStream: DStream[Array[Byte]] = null
      if (isShardDefined) {
        val shardId = args(7)
        datahubStream = DatahubUtils.createStream(
          ssc,
          project,
          topic,
          subId,
          accessKeyId,
          accessKeySecret,
          endpoint,
          shardId,
          read(_),
          StorageLevel.MEMORY_AND_DISK)
      } else {
        datahubStream = DatahubUtils.createStream(
          ssc,
          project,
          topic,
          subId,
          accessKeyId,
          accessKeySecret,
          endpoint,
          read(_),
          StorageLevel.MEMORY_AND_DISK)
      }

      // scalastyle:off
      datahubStream.foreachRDD(rdd => println(s"rdd.count(): ${rdd.count()}"))
      // scalastyle:on
      ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
      ssc
    }

    val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)

    ssc.start()
    ssc.awaitTermination()
  }