public void runTemplate()

in java/src/main/java/com/google/cloud/dataproc/templates/pubsublite/PubSubLiteToBigTable.java [69:133]


  public void runTemplate() throws InterruptedException, TimeoutException, StreamingQueryException {

    // Initialize the Spark session
    SparkSession spark =
        SparkSession.builder().appName("Spark PubSubLiteToGCS Demo Job").getOrCreate();

    // Stream data from Pubsublite topic topic
    Dataset<Row> df =
        spark
            .readStream()
            .format(PUBSUBLITE_FORMAT)
            .option(PUBSUBLITE_SUBSCRIPTION, pubsubInputSubscription)
            .load();

    df = df.withColumn("data", df.col("data").cast(DataTypes.StringType));

    StreamingQuery query =
        df.writeStream()
            .foreachBatch(
                (VoidFunction2<Dataset<Row>, Long>)
                    (rowDataset, aLong) -> {
                      rowDataset.foreachPartition(
                          (ForeachPartitionFunction<Row>)
                              t -> {
                                BigtableDataClient dataClient =
                                    BigtableDataClient.create(
                                        pubSubBigTableOutputProjectId,
                                        pubSubBigTableOutputInstanceId);

                                while (t.hasNext()) {
                                  long timestamp = System.currentTimeMillis() * 1000;
                                  Row row = t.next();

                                  JSONObject record = new JSONObject(row.get(4).toString());
                                  RowMutation rowMutation =
                                      RowMutation.create(
                                          pubSubBigTableOutputTable, record.getString(ROWKEY));

                                  JSONArray columnarray = record.getJSONArray(COLUMNS);

                                  for (int i = 0; i < columnarray.length(); i++) {
                                    rowMutation.setCell(
                                        columnarray.getJSONObject(i).getString(COLUMN_FAMILY),
                                        columnarray.getJSONObject(i).getString(COLUMN_NAME),
                                        timestamp,
                                        columnarray.getJSONObject(i).getString(COLUMN_VALUE));
                                  }

                                  dataClient.mutateRow(rowMutation);
                                }

                                dataClient.close();
                              });
                    })
            .trigger(Trigger.ProcessingTime(streamingDuration, TimeUnit.SECONDS))
            .option(PUBSUBLITE_CHECKPOINT, pubsubCheckpointLocation)
            .start();

    // Wait enough time to execute query
    query.awaitTermination(timeoutMs); // 60s
    query.stop();

    LOGGER.info("Job completed.");
    spark.stop();
  }