in v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java [632:882]
public static PipelineResult run(Options options) {
/*
* Stages:
* 1) Ingest and Normalize Data to FailsafeElement with JSON Strings
* 2) Write JSON Strings to Cloud Spanner
* 3) Write Failures to GCS Dead Letter Queue
*/
Pipeline pipeline = Pipeline.create(options);
DeadLetterQueueManager dlqManager = buildDlqManager(options);
// Ingest session file into schema object.
Schema schema = SessionFileReader.read(options.getSessionFilePath());
/*
* Stage 1: Ingest/Normalize Data to FailsafeElement with JSON Strings and
* read Cloud Spanner information schema.
* a) Prepare spanner config and process information schema
* b) Read DataStream data from GCS into JSON String FailsafeElements
* c) Reconsume Dead Letter Queue data from GCS into JSON String FailsafeElements
* d) Flatten DataStream and DLQ Streams
*/
// Prepare Spanner config
SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
.withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()))
.withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()))
.withCommitRetrySettings(
RetrySettings.newBuilder()
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setInitialRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
.setRetryDelayMultiplier(1)
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
.setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setMaxAttempts(1)
.build());
// TODO: spannerConfig = SpannerServiceFactoryImpl.createSpannerService(spannerConfig,
// <failureinjectionparameter>);
SpannerConfig shadowTableSpannerConfig = getShadowTableSpannerConfig(options);
/* Process information schema
* 1) Read information schema from destination Cloud Spanner database
* 2) Check if shadow tables are present and create if necessary
* 3) Return new information schema
*/
PCollectionTuple ddlTuple =
pipeline.apply(
"Process Information Schema",
new ProcessInformationSchema(
spannerConfig,
shadowTableSpannerConfig,
options.getShouldCreateShadowTables(),
options.getShadowTablePrefix(),
options.getDatastreamSourceType()));
PCollectionView<Ddl> ddlView =
ddlTuple
.get(ProcessInformationSchema.MAIN_DDL_TAG)
.apply("Cloud Spanner Main DDL as view", View.asSingleton());
PCollectionView<Ddl> shadowTableDdlView =
ddlTuple
.get(ProcessInformationSchema.SHADOW_TABLE_DDL_TAG)
.apply("Cloud Spanner shadow tables DDL as view", View.asSingleton());
PCollection<FailsafeElement<String, String>> jsonRecords = null;
// Elements sent to the Dead Letter Queue are to be reconsumed.
// A DLQManager is to be created using PipelineOptions, and it is in charge
// of building pieces of the DLQ.
PCollectionTuple reconsumedElements = null;
boolean isRegularMode = "regular".equals(options.getRunMode());
if (isRegularMode && (!Strings.isNullOrEmpty(options.getDlqGcsPubSubSubscription()))) {
reconsumedElements =
dlqManager.getReconsumerDataTransformForFiles(
pipeline.apply(
"Read retry from PubSub",
new PubSubNotifiedDlqIO(
options.getDlqGcsPubSubSubscription(),
// file paths to ignore when re-consuming for retry
new ArrayList<String>(
Arrays.asList("/severe/", "/tmp_retry", "/tmp_severe/", ".temp")))));
} else {
reconsumedElements =
dlqManager.getReconsumerDataTransform(
pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
}
PCollection<FailsafeElement<String, String>> dlqJsonRecords =
reconsumedElements
.get(DeadLetterQueueManager.RETRYABLE_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
if (isRegularMode) {
LOG.info("Regular Datastream flow");
PCollection<FailsafeElement<String, String>> datastreamJsonRecords =
pipeline.apply(
new DataStreamIO(
options.getStreamName(),
options.getInputFilePattern(),
options.getInputFileFormat(),
options.getGcsPubSubSubscription(),
options.getRfcStartDateTime())
.withFileReadConcurrency(options.getFileReadConcurrency())
.withoutDatastreamRecordsReshuffle()
.withDirectoryWatchDuration(
Duration.standardMinutes(options.getDirectoryWatchDurationInMinutes())));
int maxNumWorkers = options.getMaxNumWorkers() != 0 ? options.getMaxNumWorkers() : 1;
jsonRecords =
PCollectionList.of(datastreamJsonRecords)
.and(dlqJsonRecords)
.apply(Flatten.pCollections())
.apply(
"Reshuffle",
Reshuffle.<FailsafeElement<String, String>>viaRandomKey()
.withNumBuckets(
maxNumWorkers * DatastreamToSpannerConstants.MAX_DOFN_PER_WORKER));
} else {
LOG.info("DLQ retry flow");
jsonRecords =
PCollectionList.of(dlqJsonRecords)
.apply(Flatten.pCollections())
.apply("Reshuffle", Reshuffle.viaRandomKey());
}
/*
* Stage 2: Transform records
*/
// Ingest transformation context file into memory.
TransformationContext transformationContext =
TransformationContextReader.getTransformationContext(
options.getTransformationContextFilePath());
// Ingest sharding context file into memory.
ShardingContext shardingContext =
ShardingContextReader.getShardingContext(options.getShardingContextFilePath());
CustomTransformation customTransformation =
CustomTransformation.builder(
options.getTransformationJarPath(), options.getTransformationClassName())
.setCustomParameters(options.getTransformationCustomParameters())
.build();
// Create the overrides mapping.
ISchemaOverridesParser schemaOverridesParser = configureSchemaOverrides(options);
ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema,
schemaOverridesParser,
transformationContext,
shardingContext,
options.getDatastreamSourceType(),
customTransformation,
options.getRoundJsonDecimals(),
ddlView,
spannerConfig);
PCollectionTuple transformedRecords =
jsonRecords.apply(
"Apply Transformation to events",
ParDo.of(changeEventTransformerDoFn)
.withSideInputs(ddlView)
.withOutputTags(
DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG,
TupleTagList.of(
Arrays.asList(
DatastreamToSpannerConstants.FILTERED_EVENT_TAG,
DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))));
/*
* Stage 3: Write filtered records to GCS
*/
String tempLocation =
options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
? options.as(DataflowPipelineOptions.class).getTempLocation()
: options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
String filterEventsDirectory =
options.getFilteredEventsDirectory().isEmpty()
? tempLocation + "filteredEvents/"
: options.getFilteredEventsDirectory();
LOG.info("Filtered events directory: {}", filterEventsDirectory);
transformedRecords
.get(DatastreamToSpannerConstants.FILTERED_EVENT_TAG)
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(
"Write Filtered Events To GCS",
TextIO.write().to(filterEventsDirectory).withSuffix(".json").withWindowedWrites());
/*
* Stage 4: Write transformed records to Cloud Spanner
*/
SpannerTransactionWriter.Result spannerWriteResults =
transformedRecords
.get(DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG)
.apply(
"Write events to Cloud Spanner",
new SpannerTransactionWriter(
spannerConfig,
shadowTableSpannerConfig,
ddlView,
shadowTableDdlView,
options.getShadowTablePrefix(),
options.getDatastreamSourceType(),
isRegularMode));
/*
* Stage 5: Write failures to GCS Dead Letter Queue
* a) Retryable errors are written to retry GCS Dead letter queue
* b) Severe errors are written to severe GCS Dead letter queue
*/
// We will write only the original payload from the failsafe event to the DLQ. We are doing
// that in
// StringDeadLetterQueueSanitizer.
spannerWriteResults
.retryableErrors()
.apply(
"DLQ: Write retryable Failures to GCS",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getRetryDlqDirectoryWithDateTime())
.withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_retry/")
.setIncludePaneInfo(true)
.build());
PCollection<FailsafeElement<String, String>> dlqErrorRecords =
reconsumedElements
.get(DeadLetterQueueManager.PERMANENT_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
// TODO: Write errors from transformer and spanner writer into separate folders
PCollection<FailsafeElement<String, String>> permanentErrors =
PCollectionList.of(dlqErrorRecords)
.and(spannerWriteResults.permanentErrors())
.and(transformedRecords.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))
.apply(Flatten.pCollections());
// increment the metrics
permanentErrors
.apply("Update metrics", ParDo.of(new MetricUpdaterDoFn(isRegularMode)))
.apply(
"DLQ: Write Severe errors to GCS",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
.withTmpDirectory((options).getDeadLetterQueueDirectory() + "/tmp_severe/")
.setIncludePaneInfo(true)
.build());
// Execute the pipeline and return the result.
return pipeline.run();
}