in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstopubsub/BigtableChangeStreamsToPubSub.java [174:357]
public static PipelineResult run(BigtableChangeStreamsToPubSubOptions options) {
setOptions(options);
validateOptions(options);
String bigtableProject = getBigtableProjectId(options);
// Retrieve and parse the startTimestamp
Instant startTimestamp =
options.getBigtableChangeStreamStartTimestamp().isEmpty()
? Instant.now()
: toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));
BigtableSource sourceInfo =
new BigtableSource(
options.getBigtableReadInstanceId(),
options.getBigtableReadTableId(),
getBigtableCharset(options),
options.getBigtableChangeStreamIgnoreColumnFamilies(),
options.getBigtableChangeStreamIgnoreColumns(),
startTimestamp);
Topic topic = null;
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
GetTopicRequest request =
GetTopicRequest.newBuilder()
.setTopic(
TopicName.ofProjectTopicName(
getPubSubProjectId(options), options.getPubSubTopic())
.toString())
.build();
topic = topicAdminClient.getTopic(request);
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
if (!validateSchema(topic, options, sourceInfo)) {
final String errorMessage = "Configured topic doesn't accept messages of configured format";
throw new IllegalArgumentException(errorMessage);
}
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
PubSubDestination destinationInfo = newPubSubDestination(options, topic);
PubSubUtils pubSub = new PubSubUtils(sourceInfo, destinationInfo);
/*
* Stages:
* 1) Read {@link ChangeStreamMutation} from change stream.
* 2) Create {@link FailsafeElement} of {@link Mod} JSON and merge from:
* - {@link ChangeStreamMutation}.
* - GCS Dead letter queue.
* 3) Convert {@link Mod} JSON into PubsubMessage and publish it to PubSub.
* 4) Write Failures from 2) and 3) to GCS dead letter queue.
*/
// Step 1
Pipeline pipeline = Pipeline.create(options);
// Register the coders for pipeline
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(
FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
DeadLetterQueueManager dlqManager = buildDlqManager(options);
String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";
if (options.getDisableDlqRetries()) {
LOG.info(
"Disabling retries for the DLQ, directly writing into severe DLQ: {}",
dlqManager.getSevereDlqDirectoryWithDateTime());
dlqDirectory = dlqManager.getSevereDlqDirectoryWithDateTime();
tempDlqDirectory = dlqManager.getSevereDlqDirectory() + "tmp/";
}
BigtableIO.ReadChangeStream readChangeStream =
BigtableIO.readChangeStream()
.withChangeStreamName(options.getBigtableChangeStreamName())
.withExistingPipelineOptions(
options.getBigtableChangeStreamResume()
? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL
: BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS)
.withProjectId(bigtableProject)
.withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
.withInstanceId(options.getBigtableReadInstanceId())
.withTableId(options.getBigtableReadTableId())
.withAppProfileId(options.getBigtableChangeStreamAppProfile())
.withStartTime(startTimestamp);
if (!StringUtils.isBlank(options.getBigtableChangeStreamMetadataTableTableId())) {
readChangeStream =
readChangeStream.withMetadataTableTableId(
options.getBigtableChangeStreamMetadataTableTableId());
}
// Step 2: just return the output for sending to pubSub and DLQ
PCollection<ChangeStreamMutation> dataChangeRecord =
pipeline
.apply("Read from Cloud Bigtable Change Streams", readChangeStream)
.apply(Values.create());
PCollection<FailsafeElement<String, String>> sourceFailsafeModJson =
dataChangeRecord
.apply(
"ChangeStreamMutation To Mod JSON",
ParDo.of(new ChangeStreamMutationToModJsonFn(sourceInfo)))
.apply(
"Wrap Mod JSON In FailsafeElement",
ParDo.of(
new DoFn<String, FailsafeElement<String, String>>() {
@ProcessElement
public void process(
@Element String input,
OutputReceiver<FailsafeElement<String, String>> receiver) {
receiver.output(FailsafeElement.of(input, input));
}
}))
.setCoder(FAILSAFE_ELEMENT_CODER);
PCollectionTuple dlqModJson =
dlqManager.getReconsumerDataTransform(
pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
PCollection<FailsafeElement<String, String>> retryableDlqFailsafeModJson = null;
if (options.getDisableDlqRetries()) {
retryableDlqFailsafeModJson = pipeline.apply(Create.empty(FAILSAFE_ELEMENT_CODER));
} else {
retryableDlqFailsafeModJson =
dlqModJson.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
}
PCollection<FailsafeElement<String, String>> failsafeModJson =
PCollectionList.of(sourceFailsafeModJson)
.and(retryableDlqFailsafeModJson)
.apply("Merge Source And DLQ Mod JSON", Flatten.pCollections());
FailsafePublisher.FailsafeModJsonToPubsubMessageOptions failsafeModJsonToPubsubOptions =
FailsafePublisher.FailsafeModJsonToPubsubMessageOptions.builder()
.setCoder(FAILSAFE_ELEMENT_CODER)
.build();
PublishModJsonToTopic publishModJsonToTopic =
new PublishModJsonToTopic(pubSub, failsafeModJsonToPubsubOptions);
PCollection<FailsafeElement<String, String>> failedToPublish =
failsafeModJson.apply("Publish Mod JSON To Pubsub", publishModJsonToTopic);
PCollection<String> transformDlqJson =
failedToPublish.apply(
"Failed Mod JSON During Table Row Transformation",
MapElements.via(new StringDeadLetterQueueSanitizer()));
PCollectionList.of(transformDlqJson)
.apply("Merge Failed Mod JSON From Transform And PubSub", Flatten.pCollections())
.apply(
"Write Failed Mod JSON To DLQ",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqDirectory)
.withTmpDirectory(tempDlqDirectory)
.setIncludePaneInfo(true)
.build());
PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
dlqModJson.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
LOG.info(
"DLQ manager severe DLQ directory with date time: {}",
dlqManager.getSevereDlqDirectoryWithDateTime());
LOG.info("DLQ manager severe DLQ directory: {}", dlqManager.getSevereDlqDirectory() + "tmp/");
nonRetryableDlqModJsonFailsafe
.apply(
"Write Mod JSON With Non-retriable Error To DLQ",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
.withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
.setIncludePaneInfo(true)
.build());
return pipeline.run();
}