in src/main/java/com/aliyun/emr/example/spark/sql/streaming/SparkSLSContinuousStructuredStreamingJavaDemo.java [29:79]
public static void main(String[] args) throws Exception {
if (args.length < 7) {
System.err.println("Usage: SparkSLSContinuousStructuredStreamingJavaDemo <logService-project> " +
"<logService-store> <access-key-id> <access-key-secret> " +
"<endpoint> <starting-offsets> <max-offsets-per-trigger> [<checkpoint-location>]");
System.exit(1);
}
String logProject = args[0];
String logStore = args[1];
String accessKeyId = args[2];
String accessKeySecret = args[3];
String endpoint = args[4];
String startingOffsets = args[5];
String maxOffsetsPerTrigger = args[6];
String checkpointLocation = "/tmp/temporary-" + UUID.randomUUID().toString();
if (args.length > 7) {
checkpointLocation = args[7];
}
SparkSession spark = SparkSession
.builder()
.master("local[5]")
.appName("E-MapReduce Demo 6-6: Spark SLS Demo (Java)")
.getOrCreate();
spark.sparkContext().setLogLevel("WARN");
Dataset<String> lines = spark.readStream()
.format("org.apache.spark.sql.aliyun.logservice.LoghubSourceProvider")
.option("sls.project", logProject)
.option("sls.store", logStore)
.option("access.key.id", accessKeyId)
.option("access.key.secret", accessKeySecret)
.option("endpoint", endpoint)
.option("startingoffsets", startingOffsets)
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(__value__ AS STRING)")
.as(Encoders.STRING());
// Start running the query that prints the running counts to the console
StreamingQuery query = lines.writeStream()
.outputMode("append")
.format("console")
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.Continuous("5 second"))
.start();
query.awaitTermination();
}