in v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToBigQuery.java [418:614]
public static PipelineResult run(Options options) {
/*
* Stages:
* 1) Ingest and Normalize Data to FailsafeElement with JSON Strings
* 2) Write JSON Strings to TableRow Collection
* - Optionally apply a UDF
* 3) BigQuery Output of TableRow Data
* a) Map New Columns & Write to Staging Tables
* b) Map New Columns & Merge Staging to Target Table
* 4) Write Failures to GCS Dead Letter Queue
*/
Pipeline pipeline = Pipeline.create(options);
DeadLetterQueueManager dlqManager = buildDlqManager(options);
String bigqueryProjectId = getBigQueryProjectId(options);
String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
String tempDlqDir = dlqManager.getRetryDlqDirectory() + "tmp/";
InputUDFToTableRow<String> failsafeTableRowTransformer =
new InputUDFToTableRow<String>(
options.getJavascriptTextTransformGcsPath(),
options.getJavascriptTextTransformFunctionName(),
options.getJavascriptTextTransformReloadIntervalMinutes(),
options.getPythonTextTransformGcsPath(),
options.getPythonTextTransformFunctionName(),
options.getRuntimeRetries(),
FAILSAFE_ELEMENT_CODER);
StatefulRowCleaner statefulCleaner = StatefulRowCleaner.of();
/*
* Stage 1: Ingest and Normalize Data to FailsafeElement with JSON Strings
* a) Read DataStream data from GCS into JSON String FailsafeElements (datastreamJsonRecords)
* b) Reconsume Dead Letter Queue data from GCS into JSON String FailsafeElements
* (dlqJsonRecords)
* c) Flatten DataStream and DLQ Streams (jsonRecords)
*/
PCollection<FailsafeElement<String, String>> datastreamJsonRecords =
pipeline.apply(
new DataStreamIO(
options.getStreamName(),
options.getInputFilePattern(),
options.getInputFileFormat(),
options.getGcsPubSubSubscription(),
options.getRfcStartDateTime())
.withFileReadConcurrency(options.getFileReadConcurrency()));
// Elements sent to the Dead Letter Queue are to be reconsumed.
// A DLQManager is to be created using PipelineOptions, and it is in charge
// of building pieces of the DLQ.
PCollection<FailsafeElement<String, String>> dlqJsonRecords =
pipeline
.apply("DLQ Consumer/reader", dlqManager.dlqReconsumer(options.getDlqRetryMinutes()))
.apply(
"DLQ Consumer/cleaner",
ParDo.of(
new DoFn<String, FailsafeElement<String, String>>() {
@ProcessElement
public void process(
@Element String input,
OutputReceiver<FailsafeElement<String, String>> receiver) {
receiver.output(FailsafeElement.of(input, input));
}
}))
.setCoder(FAILSAFE_ELEMENT_CODER);
PCollection<FailsafeElement<String, String>> jsonRecords =
PCollectionList.of(datastreamJsonRecords)
.and(dlqJsonRecords)
.apply("Merge Datastream & DLQ", Flatten.pCollections());
/*
* Stage 2: Write JSON Strings to TableRow PCollectionTuple
* a) Optionally apply a Javascript or Python UDF
* b) Convert JSON String FailsafeElements to TableRow's (tableRowRecords)
*/
PCollectionTuple tableRowRecords =
jsonRecords.apply("UDF to TableRow/udf", failsafeTableRowTransformer);
PCollectionTuple cleanedRows =
tableRowRecords
.get(failsafeTableRowTransformer.transformOut)
.apply("UDF to TableRow/Oracle Cleaner", statefulCleaner);
PCollection<TableRow> shuffledTableRows =
cleanedRows
.get(statefulCleaner.successTag)
.apply(
"UDF to TableRow/ReShuffle",
Reshuffle.<TableRow>viaRandomKey().withNumBuckets(100));
/*
* Stage 3: BigQuery Output of TableRow Data
* a) Map New Columns & Write to Staging Tables (writeResult)
* b) Map New Columns & Merge Staging to Target Table (null)
*
* failsafe: writeResult.getFailedInsertsWithErr()
*/
// TODO(beam 2.23): InsertRetryPolicy should be CDC compliant
Set<String> fieldsToIgnore = getFieldsToIgnore(options.getIgnoreFields());
WriteResult writeResult =
shuffledTableRows
.apply(
"Map to Staging Tables",
new DataStreamMapper(
options.as(GcpOptions.class),
options.getOutputProjectId(),
options.getOutputStagingDatasetTemplate(),
options.getOutputStagingTableNameTemplate())
.withDataStreamRootUrl(options.getDataStreamRootUrl())
.withDefaultSchema(BigQueryDefaultSchemas.DATASTREAM_METADATA_SCHEMA)
.withDayPartitioning(true)
.withIgnoreFields(fieldsToIgnore))
.apply(
"Write Successful Records",
BigQueryIO.<KV<TableId, TableRow>>write()
.to(new BigQueryDynamicConverters().bigQueryDynamicDestination())
.withFormatFunction(
element -> removeTableRowFields(element.getValue(), fieldsToIgnore))
.withFormatRecordOnFailureFunction(element -> element.getValue())
.withoutValidation()
.ignoreInsertIds()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo() // takes effect only when Storage Write API is off
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
if (options.getApplyMerge()) {
shuffledTableRows
.apply(
"Map To Replica Tables",
new DataStreamMapper(
options.as(GcpOptions.class),
options.getOutputProjectId(),
options.getOutputDatasetTemplate(),
options.getOutputTableNameTemplate())
.withDataStreamRootUrl(options.getDataStreamRootUrl())
.withDefaultSchema(BigQueryDefaultSchemas.DATASTREAM_METADATA_SCHEMA)
.withIgnoreFields(fieldsToIgnore))
.apply(
"BigQuery Merge/Build MergeInfo",
new MergeInfoMapper(
bigqueryProjectId,
options.getOutputStagingDatasetTemplate(),
options.getOutputStagingTableNameTemplate(),
options.getOutputDatasetTemplate(),
options.getOutputTableNameTemplate()))
.apply(
"BigQuery Merge/Merge into Replica Tables",
BigQueryMerger.of(
MergeConfiguration.bigQueryConfiguration()
.withProjectId(bigqueryProjectId)
.withMergeWindowDuration(
Duration.standardMinutes(options.getMergeFrequencyMinutes()))
.withMergeConcurrency(options.getMergeConcurrency())
.withPartitionRetention(options.getPartitionRetentionDays())));
}
/*
* Stage 4: Write Failures to GCS Dead Letter Queue
*/
PCollection<String> udfDlqJson =
PCollectionList.of(tableRowRecords.get(failsafeTableRowTransformer.udfDeadletterOut))
.and(tableRowRecords.get(failsafeTableRowTransformer.transformDeadletterOut))
.apply("Transform Failures/Flatten", Flatten.pCollections())
.apply(
"Transform Failures/Sanitize",
MapElements.via(new StringDeadLetterQueueSanitizer()));
PCollection<String> rowCleanerJson =
cleanedRows
.get(statefulCleaner.failureTag)
.apply(
"Transform Failures/Oracle Cleaner Failures",
MapElements.via(new RowCleanerDeadLetterQueueSanitizer()));
PCollection<String> bqWriteDlqJson =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply("BigQuery Failures", MapElements.via(new BigQueryDeadLetterQueueSanitizer()));
PCollectionList.of(udfDlqJson)
.and(rowCleanerJson)
.and(bqWriteDlqJson)
.apply("Write To DLQ/Flatten", Flatten.pCollections())
.apply(
"Write To DLQ/Writer",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqDirectory)
.withTmpDirectory(tempDlqDir)
.setIncludePaneInfo(true)
.build());
// Execute the pipeline and return the result.
return pipeline.run();
}