in projects/dataflow-bigquery-change-data-capture/src/main/java/com/google/cloud/dataflow/SpannerToBigQueryUsingCdc.java [116:195]
private static void run(Options options, Pipeline p) {
SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(options.getSpannerProjectId())
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabaseId());
Timestamp readFrom = Timestamp.now();
PCollection<DataChangeRecord> dataChangeRecords =
p.apply(
"Read Change Stream",
SpannerIO.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName(options.getSpannerOrdersStreamId())
.withRpcPriority(RpcPriority.MEDIUM)
.withInclusiveStartAt(readFrom));
TableReference ordersTableReference = new TableReference();
ordersTableReference.setProjectId(options.getBigQueryProjectId());
ordersTableReference.setTableId(options.getBigQueryOrdersTableName());
ordersTableReference.setDatasetId(options.getBigQueryDataset());
WriteResult writeResult =
dataChangeRecords
.apply("To OrderMutations", ParDo.of(new DataChangeRecordToOrderMutation()))
.setCoder(new OrderMutationCoder())
.apply(
"Store Orders",
BigQueryIO.<OrderMutation>write()
.to(ordersTableReference)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
.withPropagateSuccessfulStorageApiWrites(true)
.withFormatFunction(new OrderMutationToTableRow())
.withRowMutationInformationFn(
orderMutation -> orderMutation.getMutationInformation()));
writeResult
.getFailedStorageApiInserts()
.apply("Validate no orders failed", new BigQueryFailedInsertProcessor());
PCollection<Instant> bigQuerySyncPoints =
BigQueryIoSyncPointGenerator.generate(
writeResult,
Duration.standardSeconds(options.getSyncPointDetectionFrequencyInSeconds()),
Duration.standardSeconds(options.getSyncPointDetectionLatenessInSeconds()),
Instant.ofEpochSecond(readFrom.getSeconds()));
bigQuerySyncPoints.apply("Log SyncPoints", ParDo.of(new LogSyncPoints()));
TableReference syncPointTableReference = new TableReference();
syncPointTableReference.setProjectId(options.getBigQueryProjectId());
syncPointTableReference.setTableId(options.getBigQuerySyncPointTableName());
syncPointTableReference.setDatasetId(options.getBigQueryDataset());
WriteResult syncPointWriteResult =
bigQuerySyncPoints.apply(
"Store Sync Point",
BigQueryIO.<Instant>write()
.to(syncPointTableReference)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
.withFormatFunction(
syncPoint -> {
TableRow result = new TableRow();
result.set("table_name", "order");
result.set("sync_point", syncPoint);
return result;
})
.withRowMutationInformationFn(
syncPoint ->
RowMutationInformation.of(MutationType.UPSERT, syncPoint.getMillis())));
syncPointWriteResult
.getFailedStorageApiInserts()
.apply("Validate no sync points failed", new BigQueryFailedInsertProcessor());
p.run();
}