in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java [245:472]
public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) {
setOptions(options);
validateOptions(options);
/**
* Stages: 1) Read {@link DataChangeRecord} from change stream. 2) Create {@link
* FailsafeElement} of {@link Mod} JSON and merge from: - {@link DataChangeRecord}. - GCS Dead
* letter queue. 3) Convert {@link Mod} JSON into {@link TableRow} by reading from Spanner at
* commit timestamp. 4) Append {@link TableRow} to BigQuery. 5) Write Failures from 2), 3) and
* 4) to GCS dead letter queue.
*/
Pipeline pipeline = Pipeline.create(options);
DeadLetterQueueManager dlqManager = buildDlqManager(options);
String spannerProjectId = OptionsUtils.getSpannerProjectId(options);
String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";
/**
* There are two types of errors that can occur in this pipeline:
*
* <p>1) Error originating from modJsonStringToTableRow. Errors here are either due to pk values
* missing, a spanner table / column missing in the in-memory map, or some Spanner read error
* happening in readSpannerRow. We already retry the Spanner read error inline 3 times. Th other
* types of errors are more likely to be un-retriable.
*
* <p>2) Error originating from BigQueryIO.write. BigQuery storage write API already retries all
* transient errors and outputs more permanent errors.
*
* <p>As a result, it is reasonable to write all errors happening in the pipeline directly into
* the permanent DLQ, since most of the errors are likely to be non-transient.
*/
if (options.getDisableDlqRetries()) {
LOG.info(
"Disabling retries for the DLQ, directly writing into severe DLQ: {}",
dlqManager.getSevereDlqDirectoryWithDateTime());
dlqDirectory = dlqManager.getSevereDlqDirectoryWithDateTime();
tempDlqDirectory = dlqManager.getSevereDlqDirectory() + "tmp/";
}
// Retrieve and parse the startTimestamp and endTimestamp.
Timestamp startTimestamp =
options.getStartTimestamp().isEmpty()
? Timestamp.now()
: Timestamp.parseTimestamp(options.getStartTimestamp());
Timestamp endTimestamp =
options.getEndTimestamp().isEmpty()
? Timestamp.MAX_VALUE
: Timestamp.parseTimestamp(options.getEndTimestamp());
SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(spannerProjectId)
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabase())
.withRpcPriority(options.getRpcPriority());
if (!Strings.isNullOrEmpty(options.getSpannerHost())) {
spannerConfig =
spannerConfig.withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()));
}
// Propagate database role for fine-grained access control on change stream.
if (options.getSpannerDatabaseRole() != null) {
spannerConfig =
spannerConfig.withDatabaseRole(
ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
}
SpannerIO.ReadChangeStream readChangeStream =
SpannerIO.readChangeStream()
.withSpannerConfig(spannerConfig)
.withMetadataInstance(options.getSpannerMetadataInstanceId())
.withMetadataDatabase(options.getSpannerMetadataDatabase())
.withChangeStreamName(options.getSpannerChangeStreamName())
.withInclusiveStartAt(startTimestamp)
.withInclusiveEndAt(endTimestamp)
.withRpcPriority(options.getRpcPriority());
String spannerMetadataTableName = options.getSpannerMetadataTableName();
if (spannerMetadataTableName != null) {
readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName);
}
PCollection<DataChangeRecord> dataChangeRecord =
pipeline
.apply("Read from Spanner Change Streams", readChangeStream)
.apply("Reshuffle DataChangeRecord", Reshuffle.viaRandomKey());
PCollection<FailsafeElement<String, String>> sourceFailsafeModJson =
dataChangeRecord
.apply("DataChangeRecord To Mod JSON", ParDo.of(new DataChangeRecordToModJsonFn()))
.apply(
"Wrap Mod JSON In FailsafeElement",
ParDo.of(
new DoFn<String, FailsafeElement<String, String>>() {
@ProcessElement
public void process(
@Element String input,
OutputReceiver<FailsafeElement<String, String>> receiver) {
receiver.output(FailsafeElement.of(input, input));
}
}))
.setCoder(FAILSAFE_ELEMENT_CODER);
PCollectionTuple dlqModJson =
dlqManager.getReconsumerDataTransform(
pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
PCollection<FailsafeElement<String, String>> retryableDlqFailsafeModJson =
dlqModJson.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
PCollection<FailsafeElement<String, String>> failsafeModJson =
PCollectionList.of(sourceFailsafeModJson)
.and(retryableDlqFailsafeModJson)
.apply("Merge Source And DLQ Mod JSON", Flatten.pCollections());
ImmutableSet.Builder<String> ignoreFieldsBuilder = ImmutableSet.builder();
for (String ignoreField : options.getIgnoreFields().split(",")) {
ignoreFieldsBuilder.add(ignoreField);
}
ImmutableSet<String> ignoreFields = ignoreFieldsBuilder.build();
FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions
failsafeModJsonToTableRowOptions =
FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions.builder()
.setSpannerConfig(spannerConfig)
.setSpannerChangeStream(options.getSpannerChangeStreamName())
.setIgnoreFields(ignoreFields)
.setCoder(FAILSAFE_ELEMENT_CODER)
.setUseStorageWriteApi(options.getUseStorageWriteApi())
.build();
FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRow failsafeModJsonToTableRow =
new FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRow(
failsafeModJsonToTableRowOptions);
PCollectionTuple tableRowTuple =
failsafeModJson.apply("Mod JSON To TableRow", failsafeModJsonToTableRow);
// If users pass in the full BigQuery dataset ID (projectId.datasetName), extract the dataset
// name for the setBigQueryDataset parameter.
List<String> results = OptionsUtils.processBigQueryProjectAndDataset(options);
String bigqueryProject = results.get(0);
String bigqueryDataset = results.get(1);
BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions
bigQueryDynamicDestinationsOptions =
BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions.builder()
.setSpannerConfig(spannerConfig)
.setChangeStreamName(options.getSpannerChangeStreamName())
.setIgnoreFields(ignoreFields)
.setBigQueryProject(bigqueryProject)
.setBigQueryDataset(bigqueryDataset)
.setBigQueryTableTemplate(options.getBigQueryChangelogTableNameTemplate())
.setUseStorageWriteApi(options.getUseStorageWriteApi())
.build();
WriteResult writeResult;
if (!options.getUseStorageWriteApi()) {
writeResult =
tableRowTuple
.get(failsafeModJsonToTableRow.transformOut)
.apply(
"Write To BigQuery",
BigQueryIO.<TableRow>write()
.to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
.withFormatFunction(element -> removeIntermediateMetadataFields(element))
.withFormatRecordOnFailureFunction(element -> element)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
} else {
writeResult =
tableRowTuple
.get(failsafeModJsonToTableRow.transformOut)
.apply(
"Write To BigQuery",
BigQueryIO.<TableRow>write()
.to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
.withFormatFunction(element -> removeIntermediateMetadataFields(element))
.withFormatRecordOnFailureFunction(element -> element)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.ignoreUnknownValues()
.withAutoSchemaUpdate(true) // only supported when using STORAGE_WRITE_API or
// STORAGE_API_AT_LEAST_ONCE.
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
}
PCollection<String> transformDlqJson =
tableRowTuple
.get(failsafeModJsonToTableRow.transformDeadLetterOut)
.apply(
"Failed Mod JSON During Table Row Transformation",
MapElements.via(new StringDeadLetterQueueSanitizer()));
PCollection<String> bqWriteDlqJson =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"Failed Mod JSON During BigQuery Writes",
MapElements.via(new BigQueryDeadLetterQueueSanitizer()));
PCollectionList.of(transformDlqJson)
// Generally BigQueryIO storage write retries transient errors, and only more
// persistent errors make it into DLQ.
.and(bqWriteDlqJson)
.apply("Merge Failed Mod JSON From Transform And BigQuery", Flatten.pCollections())
.apply(
"Write Failed Mod JSON To DLQ",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqDirectory)
.withTmpDirectory(tempDlqDirectory)
.setIncludePaneInfo(true)
.build());
PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
dlqModJson.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
nonRetryableDlqModJsonFailsafe
.apply(
"Write Mod JSON With Non-retryable Error To DLQ",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
.withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
.setIncludePaneInfo(true)
.build());
return pipeline.run();
}