in src/main/scala/com/aliyun/emr/example/spark/sql/streaming/SparkSLSStructuredStreamingDemo.scala [24:71]
def main(args: Array[String]) {
if (args.length < 7) {
System.err.println("Usage: SparkSLSStructuredStreamingDemo <logService-project> " +
"<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
"<starting-offsets> <max-offsets-per-trigger> [<checkpoint-location>]")
System.exit(1)
}
val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, _*) = args
val checkpointLocation =
if (args.length > 7) args(7) else "/tmp/temporary-" + UUID.randomUUID.toString
val spark = SparkSession
.builder
.appName("E-MapReduce Demo 6-3: Spark SLS Demo (Scala)")
.master("local[5]")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
// Create DataSet representing the stream of input lines from loghub
val lines = spark
.readStream
.format("org.apache.spark.sql.aliyun.logservice.LoghubSourceProvider")
.option("sls.project", project)
.option("sls.store", logStore)
.option("access.key.id", accessKeyId)
.option("access.key.secret", accessKeySecret)
.option("endpoint", endpoint)
.option("startingoffsets", startingOffsets)
.option("zookeeper.connect.address", "localhost:2181")
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(__value__ AS STRING)")
.as[String]
val wordCounts = lines.flatMap(_.split(" ")).groupBy("__value__").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
query.awaitTermination()
}