public static void main()

in src/main/java/com/aliyun/emr/example/spark/sql/streaming/SparkSLSStructuredStreamingJavaDemo.java [31:86]


  public static void main(String[] args) throws Exception {
    if (args.length < 7) {
      System.err.println("Usage: SparkSLSStructuredStreamingJavaDemo <logService-project> " +
          "<logService-store> <access-key-id> <access-key-secret> " +
          "<endpoint> <starting-offsets> <max-offsets-per-trigger> [<checkpoint-location>]");
      System.exit(1);
    }

    String logProject = args[0];
    String logStore = args[1];
    String accessKeyId = args[2];
    String accessKeySecret = args[3];
    String endpoint = args[4];
    String startingOffsets = args[5];
    String maxOffsetsPerTrigger = args[6];
    String checkpointLocation = "/tmp/temporary-" + UUID.randomUUID().toString();
    if (args.length > 7) {
      checkpointLocation = args[7];
    }

    SparkSession spark = SparkSession
        .builder()
        .master("local[5]")
        .appName("E-MapReduce Demo 6-4: Spark SLS Demo (Java)")
        .getOrCreate();

    spark.sparkContext().setLogLevel("WARN");

    Dataset<String> lines = spark.readStream()
        .format("org.apache.spark.sql.aliyun.logservice.LoghubSourceProvider")
        .option("sls.project", logProject)
        .option("sls.store", logStore)
        .option("access.key.id", accessKeyId)
        .option("access.key.secret", accessKeySecret)
        .option("endpoint", endpoint)
        .option("startingoffsets", startingOffsets)
        .option("zookeeper.connect.address", "localhost:2181")
        .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
        .load()
        .selectExpr("CAST(__value__ AS STRING)")
        .as(Encoders.STRING());

    // Generate running word count
    Dataset<Row> wordCounts = lines.flatMap(
        (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
        Encoders.STRING()).groupBy("value").count();

    // Start running the query that prints the running counts to the console
    StreamingQuery query = wordCounts.writeStream()
        .outputMode("complete")
        .format("console")
        .option("checkpointLocation", checkpointLocation)
        .start();

    query.awaitTermination();
  }