private static void run()

in projects/dataflow-bigquery-change-data-capture/src/main/java/com/google/cloud/dataflow/SpannerToBigQueryUsingCdc.java [116:195]


  private static void run(Options options, Pipeline p) {
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withProjectId(options.getSpannerProjectId())
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabaseId());

    Timestamp readFrom = Timestamp.now();
    PCollection<DataChangeRecord> dataChangeRecords =
        p.apply(
            "Read Change Stream",
            SpannerIO.readChangeStream()
                .withSpannerConfig(spannerConfig)
                .withChangeStreamName(options.getSpannerOrdersStreamId())
                .withRpcPriority(RpcPriority.MEDIUM)
                .withInclusiveStartAt(readFrom));

    TableReference ordersTableReference = new TableReference();
    ordersTableReference.setProjectId(options.getBigQueryProjectId());
    ordersTableReference.setTableId(options.getBigQueryOrdersTableName());
    ordersTableReference.setDatasetId(options.getBigQueryDataset());

    WriteResult writeResult =
        dataChangeRecords
            .apply("To OrderMutations", ParDo.of(new DataChangeRecordToOrderMutation()))
            .setCoder(new OrderMutationCoder())
            .apply(
                "Store Orders",
                BigQueryIO.<OrderMutation>write()
                    .to(ordersTableReference)
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
                    .withPropagateSuccessfulStorageApiWrites(true)
                    .withFormatFunction(new OrderMutationToTableRow())
                    .withRowMutationInformationFn(
                        orderMutation -> orderMutation.getMutationInformation()));

    writeResult
        .getFailedStorageApiInserts()
        .apply("Validate no orders failed", new BigQueryFailedInsertProcessor());

    PCollection<Instant> bigQuerySyncPoints =
        BigQueryIoSyncPointGenerator.generate(
            writeResult,
            Duration.standardSeconds(options.getSyncPointDetectionFrequencyInSeconds()),
            Duration.standardSeconds(options.getSyncPointDetectionLatenessInSeconds()),
            Instant.ofEpochSecond(readFrom.getSeconds()));
    bigQuerySyncPoints.apply("Log SyncPoints", ParDo.of(new LogSyncPoints()));

    TableReference syncPointTableReference = new TableReference();
    syncPointTableReference.setProjectId(options.getBigQueryProjectId());
    syncPointTableReference.setTableId(options.getBigQuerySyncPointTableName());
    syncPointTableReference.setDatasetId(options.getBigQueryDataset());

    WriteResult syncPointWriteResult =
        bigQuerySyncPoints.apply(
            "Store Sync Point",
            BigQueryIO.<Instant>write()
                .to(syncPointTableReference)
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
                .withFormatFunction(
                    syncPoint -> {
                      TableRow result = new TableRow();
                      result.set("table_name", "order");
                      result.set("sync_point", syncPoint);
                      return result;
                    })
                .withRowMutationInformationFn(
                    syncPoint ->
                        RowMutationInformation.of(MutationType.UPSERT, syncPoint.getMillis())));

    syncPointWriteResult
        .getFailedStorageApiInserts()
        .apply("Validate no sync points failed", new BigQueryFailedInsertProcessor());

    p.run();
  }