in v2/pubsub-cdc-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/PubSubCdcToBigQuery.java [286:532]
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
DeadLetterQueueManager dlqManager = buildDlqManager(options);
String gcsOutputDateTimeDirectory = null;
if (!Strings.isNullOrEmpty(options.getDeadLetterQueueDirectory())) {
gcsOutputDateTimeDirectory = dlqManager.getRetryDlqDirectory() + "YYYY/MM/DD/HH/mm/";
}
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
coderRegistry.registerCoderForType(
FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
InputUDFToTableRow<String> failsafeTableRowTransformer =
new InputUDFToTableRow<String>(
options.getJavascriptTextTransformGcsPath(),
options.getJavascriptTextTransformFunctionName(),
options.getJavascriptTextTransformReloadIntervalMinutes(),
options.getPythonTextTransformGcsPath(),
options.getPythonTextTransformFunctionName(),
options.getRuntimeRetries(),
FAILSAFE_ELEMENT_CODER);
BigQueryTableConfigManager bqConfigManager =
new BigQueryTableConfigManager(
options.as(GcpOptions.class).getProject(),
options.getOutputDatasetTemplate(),
options.getOutputTableNameTemplate(),
options.getOutputTableSpec());
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Transform the PubsubMessages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* - Automap new objects to BigQuery if enabled
* - Write records to BigQuery tables
* 4) Write failed records out to BigQuery
*/
/*
* Step #1: Read messages in from Pub/Sub
*/
PCollection<PubsubMessage> messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()));
PCollection<FailsafeElement<String, String>> jsonRecords;
if (options.getDeadLetterQueueDirectory() != null) {
PCollection<FailsafeElement<String, String>> failsafeMessages =
messages.apply("ConvertPubSubToFailsafe", ParDo.of(new PubSubToFailSafeElement()));
PCollection<FailsafeElement<String, String>> dlqJsonRecords =
pipeline
.apply(dlqManager.dlqReconsumer())
.apply(
ParDo.of(
new DoFn<String, FailsafeElement<String, String>>() {
@ProcessElement
public void process(
@Element String input,
OutputReceiver<FailsafeElement<String, String>> receiver) {
receiver.output(FailsafeElement.of(input, input));
}
}))
.setCoder(FAILSAFE_ELEMENT_CODER);
jsonRecords =
PCollectionList.of(failsafeMessages).and(dlqJsonRecords).apply(Flatten.pCollections());
} else {
jsonRecords =
messages.apply("ConvertPubSubToFailsafe", ParDo.of(new PubSubToFailSafeElement()));
}
PCollectionTuple convertedTableRows =
jsonRecords
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply(
Reshuffle.<FailsafeElement<String, String>>viaRandomKey()
.withNumBuckets(options.getThreadCount()))
.apply("ApplyUdfAndConvertToTableRow", failsafeTableRowTransformer);
/*
* Step #3: Write the successful records out to BigQuery
* Either extract table destination only
* or extract table destination and auto-map new columns
*/
PCollection<KV<TableId, TableRow>> tableEvents;
if (options.getAutoMapTables()) {
tableEvents =
convertedTableRows
.get(failsafeTableRowTransformer.transformOut)
.apply(
"Map Data to BigQuery Tables",
new BigQueryMappers(bqConfigManager.getProjectId())
.buildBigQueryTableMapper(
bqConfigManager.getDatasetTemplate(), bqConfigManager.getTableTemplate())
.withDefaultSchemaFromGCS(options.getSchemaFilePath()));
} else {
tableEvents =
convertedTableRows
.get(failsafeTableRowTransformer.transformOut)
.apply(
"ExtractBigQueryTableDestination",
BigQueryDynamicConverters.extractTableRowDestination(
bqConfigManager.getProjectId(),
bqConfigManager.getDatasetTemplate(),
bqConfigManager.getTableTemplate()));
}
/*
* Step #3: Cont.
* - Write rows out to BigQuery
*/
// TODO(https://github.com/apache/beam/pull/12004): Switch out alwaysRetry
WriteResult writeResult =
tableEvents.apply(
"WriteSuccessfulRecords",
BigQueryIO.<KV<TableId, TableRow>>write()
.to(new BigQueryDynamicConverters().bigQueryDynamicDestination())
.withFormatFunction(element -> element.getValue())
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
/*
* Stage 4: Write Failures to GCS Dead Letter Queue
*/
// TODO: Cover tableRowRecords.get(TRANSFORM_DEADLETTER_OUT) error values
if (options.getDeadLetterQueueDirectory() != null) {
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"DLQ: Write Insert Failures to GCS",
MapElements.via(new BigQueryDeadLetterQueueSanitizer()))
.apply(
"Creating " + options.getWindowDuration() + " Window",
Window.into(
FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
.apply(
"DLQ: Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(20)
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(gcsOutputDateTimeDirectory)
.withOutputFilenamePrefix("error")
.withShardTemplate("-SSSSS-of-NNNNN")
.withSuffix(".json"))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
options.getDeadLetterQueueDirectory())));
PCollection<FailsafeElement<String, String>> transformDeadletter =
PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(failsafeTableRowTransformer.udfDeadletterOut),
convertedTableRows.get(failsafeTableRowTransformer.transformDeadletterOut)))
.apply("Flatten", Flatten.pCollections())
.apply(
"Creating " + options.getWindowDuration() + " Window",
Window.into(
FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))));
PCollection<String> dlqWindowing =
transformDeadletter
.apply("Sanitize records", MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of());
dlqWindowing.apply(
"DLQ: Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(20)
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(gcsOutputDateTimeDirectory)
.withOutputFilenamePrefix("error")
.withShardTemplate("-SSSSS-of-NNNNN")
.withSuffix(".json"))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
gcsOutputDateTimeDirectory + "tmp/")));
} else {
PCollection<FailsafeElement<String, String>> failedInserts =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(
(BigQueryInsertError e) -> BigQueryConverters.wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write records that failed table row transformation
* or conversion out to BigQuery deadletter table.
*/
PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(failsafeTableRowTransformer.udfDeadletterOut),
convertedTableRows.get(failsafeTableRowTransformer.transformDeadletterOut)))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
BigQueryConverters.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
bqConfigManager.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
// 5) Insert records that failed insert into deadletter table
failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
BigQueryConverters.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
bqConfigManager.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
}
return pipeline.run();
}