in v2/spanner-change-streams-to-sharded-file-sink/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToShardedFileSink.java [352:483]
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
ShardFileReader shardFileReader = new ShardFileReader(new SecretManagerAccessorImpl());
List<Shard> shards = shardFileReader.getOrderedShardDetails(options.getSourceShardsFilePath());
if (shards == null || shards.isEmpty()) {
throw new RuntimeException("The source shards file cannot be empty");
}
String shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
if (shards.size() > 1) {
shardingMode = Constants.SHARDING_MODE_MULTI_SHARD;
}
// Prepare Spanner config
SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()));
SpannerConfig spannerMetadataConfig =
SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getMetadataInstance()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getMetadataDatabase()));
boolean isMetadataDbPostgres =
Dialect.POSTGRESQL
== SpannerAccessor.getOrCreate(spannerMetadataConfig)
.getDatabaseAdminClient()
.getDatabase(
spannerMetadataConfig.getInstanceId().get(),
spannerMetadataConfig.getDatabaseId().get())
.getDialect();
Schema schema = null;
Ddl ddl = null;
if (shardingMode.equals(Constants.SHARDING_MODE_MULTI_SHARD)) {
schema = SessionFileReader.read(options.getSessionFilePath());
ddl = InformationSchemaReader.getInformationSchemaAsDdl(spannerConfig);
}
String tableSuffix = "";
if (options.getMetadataTableSuffix() != null && !options.getMetadataTableSuffix().isEmpty()) {
tableSuffix = options.getMetadataTableSuffix();
if (!Pattern.compile("[a-zA-Z0-9_]+").matcher(tableSuffix).matches()) {
throw new RuntimeException(
"Only alpha numeric and underscores allowed in metadataTableSuffix, however found : "
+ tableSuffix);
}
}
// Have a common start time stamp when updating the metadata tables
// And when reading from change streams
SpannerDao spannerDao =
new SpannerDao(
options.getSpannerProjectId(),
options.getMetadataInstance(),
options.getMetadataDatabase(),
tableSuffix,
isMetadataDbPostgres);
SpannerToGcsJobMetadata jobMetadata = getStartTimeAndDuration(options, spannerDao);
// Capture the window start time and duration config.
// This is read by the GCSToSource template to ensure the same config is used in both templates.
if (options.getRunMode().equals(Constants.RUN_MODE_REGULAR)) {
JobMetadataUpdater.writeStartAndDuration(spannerDao, options.getRunIdentifier(), jobMetadata);
}
// Initialize the per shard progress with historical value
// This makes it easier to fire blind UPDATES later on when
// updating per shard file creation progress
FileCreationTracker fileCreationTracker =
new FileCreationTracker(spannerDao, options.getRunIdentifier());
fileCreationTracker.init(shards);
spannerDao.close();
pipeline
.apply(
getReadChangeStreamDoFn(
options, spannerConfig, Timestamp.parseTimestamp(jobMetadata.getStartTimestamp())))
.apply("Reshuffle", Reshuffle.viaRandomKey())
.apply(ParDo.of(new FilterRecordsFn(options.getFiltrationMode())))
.apply(ParDo.of(new PreprocessRecordsFn()))
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class))
.apply(
ParDo.of(
new AssignShardIdFn(
spannerConfig,
schema,
ddl,
shardingMode,
shards.get(0).getLogicalShardId(),
options.getSkipDirectoryName(),
options.getShardingCustomJarPath(),
options.getShardingCustomClassName(),
options.getShardingCustomParameters())))
.apply(
"Creating " + options.getWindowDuration() + " Window",
Window.into(
FixedWindows.of(DurationUtils.parseDuration(jobMetadata.getWindowDuration()))))
.apply(
"Tracking change data seen",
ParDo.of(
new ChangeDataProgressTrackerFn(
spannerMetadataConfig,
tableSuffix,
options.getRunIdentifier(),
isMetadataDbPostgres)))
.apply("Reshuffle", Reshuffle.viaRandomKey())
.apply(
"Write To GCS",
WriterGCS.newBuilder()
.withGcsOutputDirectory(options.getGcsOutputDirectory())
.withTempLocation(options.getTempLocation())
.build())
.apply(
"Creating file tracking window",
Window.into(
FixedWindows.of(DurationUtils.parseDuration(jobMetadata.getWindowDuration()))))
.apply(
"Tracking file progress ",
ParDo.of(
new FileProgressTrackerFn(
spannerMetadataConfig,
tableSuffix,
options.getRunIdentifier(),
isMetadataDbPostgres)));
return pipeline.run();
}