in src/main/java/com/aliyun/emr/example/spark/sql/streaming/SparkSLSStructuredStreamingJavaDemo.java [31:86]
public static void main(String[] args) throws Exception {
if (args.length < 7) {
System.err.println("Usage: SparkSLSStructuredStreamingJavaDemo <logService-project> " +
"<logService-store> <access-key-id> <access-key-secret> " +
"<endpoint> <starting-offsets> <max-offsets-per-trigger> [<checkpoint-location>]");
System.exit(1);
}
String logProject = args[0];
String logStore = args[1];
String accessKeyId = args[2];
String accessKeySecret = args[3];
String endpoint = args[4];
String startingOffsets = args[5];
String maxOffsetsPerTrigger = args[6];
String checkpointLocation = "/tmp/temporary-" + UUID.randomUUID().toString();
if (args.length > 7) {
checkpointLocation = args[7];
}
SparkSession spark = SparkSession
.builder()
.master("local[5]")
.appName("E-MapReduce Demo 6-4: Spark SLS Demo (Java)")
.getOrCreate();
spark.sparkContext().setLogLevel("WARN");
Dataset<String> lines = spark.readStream()
.format("org.apache.spark.sql.aliyun.logservice.LoghubSourceProvider")
.option("sls.project", logProject)
.option("sls.store", logStore)
.option("access.key.id", accessKeyId)
.option("access.key.secret", accessKeySecret)
.option("endpoint", endpoint)
.option("startingoffsets", startingOffsets)
.option("zookeeper.connect.address", "localhost:2181")
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(__value__ AS STRING)")
.as(Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
Encoders.STRING()).groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start();
query.awaitTermination();
}