def main()

in src/main/scala/com/aliyun/emr/example/spark/sql/streaming/SparkSLSStructuredStreamingDemo.scala [24:71]


  def main(args: Array[String]) {
    if (args.length < 7) {
      System.err.println("Usage: SparkSLSStructuredStreamingDemo <logService-project> " +
        "<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
        "<starting-offsets> <max-offsets-per-trigger> [<checkpoint-location>]")
      System.exit(1)
    }

    val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, _*) = args
    val checkpointLocation =
      if (args.length > 7) args(7) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .appName("E-MapReduce Demo 6-3: Spark SLS Demo (Scala)")
      .master("local[5]")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._

    // Create DataSet representing the stream of input lines from loghub
    val lines = spark
      .readStream
      .format("org.apache.spark.sql.aliyun.logservice.LoghubSourceProvider")
      .option("sls.project", project)
      .option("sls.store", logStore)
      .option("access.key.id", accessKeyId)
      .option("access.key.secret", accessKeySecret)
      .option("endpoint", endpoint)
      .option("startingoffsets", startingOffsets)
      .option("zookeeper.connect.address", "localhost:2181")
      .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
      .load()
      .selectExpr("CAST(__value__ AS STRING)")
      .as[String]

    val wordCounts = lines.flatMap(_.split(" ")).groupBy("__value__").count()

    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("checkpointLocation", checkpointLocation)
      .start()

    query.awaitTermination()
  }