in java/src/main/java/com/google/cloud/dataproc/templates/pubsublite/PubSubLiteToBigTable.java [69:133]
public void runTemplate() throws InterruptedException, TimeoutException, StreamingQueryException {
// Initialize the Spark session
SparkSession spark =
SparkSession.builder().appName("Spark PubSubLiteToGCS Demo Job").getOrCreate();
// Stream data from Pubsublite topic topic
Dataset<Row> df =
spark
.readStream()
.format(PUBSUBLITE_FORMAT)
.option(PUBSUBLITE_SUBSCRIPTION, pubsubInputSubscription)
.load();
df = df.withColumn("data", df.col("data").cast(DataTypes.StringType));
StreamingQuery query =
df.writeStream()
.foreachBatch(
(VoidFunction2<Dataset<Row>, Long>)
(rowDataset, aLong) -> {
rowDataset.foreachPartition(
(ForeachPartitionFunction<Row>)
t -> {
BigtableDataClient dataClient =
BigtableDataClient.create(
pubSubBigTableOutputProjectId,
pubSubBigTableOutputInstanceId);
while (t.hasNext()) {
long timestamp = System.currentTimeMillis() * 1000;
Row row = t.next();
JSONObject record = new JSONObject(row.get(4).toString());
RowMutation rowMutation =
RowMutation.create(
pubSubBigTableOutputTable, record.getString(ROWKEY));
JSONArray columnarray = record.getJSONArray(COLUMNS);
for (int i = 0; i < columnarray.length(); i++) {
rowMutation.setCell(
columnarray.getJSONObject(i).getString(COLUMN_FAMILY),
columnarray.getJSONObject(i).getString(COLUMN_NAME),
timestamp,
columnarray.getJSONObject(i).getString(COLUMN_VALUE));
}
dataClient.mutateRow(rowMutation);
}
dataClient.close();
});
})
.trigger(Trigger.ProcessingTime(streamingDuration, TimeUnit.SECONDS))
.option(PUBSUBLITE_CHECKPOINT, pubsubCheckpointLocation)
.start();
// Wait enough time to execute query
query.awaitTermination(timeoutMs); // 60s
query.stop();
LOGGER.info("Job completed.");
spark.stop();
}