in v1/src/main/java/com/google/cloud/teleport/spanner/spannerio/SpannerIO.java [1730:1875]
public PCollection<DataChangeRecord> expand(PBegin input) {
checkArgument(
getSpannerConfig() != null,
"SpannerIO.readChangeStream() requires the spanner config to be set.");
checkArgument(
getSpannerConfig().getProjectId() != null,
"SpannerIO.readChangeStream() requires the project ID to be set.");
checkArgument(
getSpannerConfig().getInstanceId() != null,
"SpannerIO.readChangeStream() requires the instance ID to be set.");
checkArgument(
getSpannerConfig().getDatabaseId() != null,
"SpannerIO.readChangeStream() requires the database ID to be set.");
checkArgument(
getChangeStreamName() != null,
"SpannerIO.readChangeStream() requires the name of the change stream to be set.");
checkArgument(
getInclusiveStartAt() != null,
"SpannerIO.readChangeStream() requires the start time to be set.");
// Inclusive end at is defaulted to ChangeStreamsContants.MAX_INCLUSIVE_END_AT
checkArgument(
getInclusiveEndAt() != null,
"SpannerIO.readChangeStream() requires the end time to be set. If you'd like to process the stream without an end time, you can omit this parameter.");
if (getMetadataInstance() != null) {
checkArgument(
getMetadataDatabase() != null,
"SpannerIO.readChangeStream() requires the metadata database to be set if metadata instance is set.");
}
// Start time must be before end time
if (getInclusiveEndAt() != null
&& getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimestamp())) {
throw new IllegalArgumentException("Start time cannot be after end time.");
}
final DatabaseId changeStreamDatabaseId =
DatabaseId.of(
getSpannerConfig().getProjectId().get(),
getSpannerConfig().getInstanceId().get(),
getSpannerConfig().getDatabaseId().get());
final String partitionMetadataInstanceId =
MoreObjects.firstNonNull(
getMetadataInstance(), changeStreamDatabaseId.getInstanceId().getInstance());
final String partitionMetadataDatabaseId =
MoreObjects.firstNonNull(getMetadataDatabase(), changeStreamDatabaseId.getDatabase());
final DatabaseId fullPartitionMetadataDatabaseId =
DatabaseId.of(
getSpannerConfig().getProjectId().get(),
partitionMetadataInstanceId,
partitionMetadataDatabaseId);
final SpannerConfig changeStreamSpannerConfig = buildChangeStreamSpannerConfig();
final SpannerConfig partitionMetadataSpannerConfig =
MetadataSpannerConfigFactory.create(
changeStreamSpannerConfig, partitionMetadataInstanceId, partitionMetadataDatabaseId);
final Dialect changeStreamDatabaseDialect =
getDialect(changeStreamSpannerConfig, input.getPipeline().getOptions());
final Dialect metadataDatabaseDialect =
getDialect(partitionMetadataSpannerConfig, input.getPipeline().getOptions());
LOG.info(
"The Spanner database "
+ changeStreamDatabaseId
+ " has dialect "
+ changeStreamDatabaseDialect);
LOG.info(
"The Spanner database "
+ fullPartitionMetadataDatabaseId
+ " has dialect "
+ metadataDatabaseDialect);
PartitionMetadataTableNames partitionMetadataTableNames =
Optional.ofNullable(getMetadataTable())
.map(
table ->
PartitionMetadataTableNames.fromExistingTable(
partitionMetadataDatabaseId, table))
.orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));
final String changeStreamName = getChangeStreamName();
final Timestamp startTimestamp = getInclusiveStartAt();
// Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the
// interval into a closed-open in the read change stream restriction (prevents overflow)
final Timestamp endTimestamp =
getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
? MAX_INCLUSIVE_END_AT
: getInclusiveEndAt();
final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect);
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
final DaoFactory daoFactory =
new DaoFactory(
changeStreamSpannerConfig,
changeStreamName,
partitionMetadataSpannerConfig,
partitionMetadataTableNames,
rpcPriority,
input.getPipeline().getOptions().getJobName(),
changeStreamDatabaseDialect,
metadataDatabaseDialect);
final ActionFactory actionFactory = new ActionFactory();
final Duration watermarkRefreshRate =
MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE);
final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate);
final InitializeDoFn initializeDoFn =
new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(
daoFactory, mapperFactory, actionFactory, cacheFactory, metrics);
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);
LOG.info(
"Partition metadata table that will be used is "
+ partitionMetadataTableNames.getTableName());
final PCollection<byte[]> impulseOut = input.apply(Impulse.create());
final PCollection<PartitionMetadata> partitionsOut =
impulseOut
.apply("Initialize the connector", ParDo.of(initializeDoFn))
.apply("Detect new partitions", ParDo.of(detectNewPartitionsDoFn));
final Coder<PartitionMetadata> partitionMetadataCoder = partitionsOut.getCoder();
final SizeEstimator<PartitionMetadata> partitionMetadataSizeEstimator =
new SizeEstimator<>(partitionMetadataCoder);
final long averagePartitionBytesSize =
partitionMetadataSizeEstimator.sizeOf(ChangeStreamsConstants.SAMPLE_PARTITION);
detectNewPartitionsDoFn.setAveragePartitionBytesSize(averagePartitionBytesSize);
final PCollection<DataChangeRecord> dataChangeRecordsOut =
partitionsOut
.apply("Read change stream partition", ParDo.of(readChangeStreamPartitionDoFn))
.apply("Gather metrics", ParDo.of(postProcessingMetricsDoFn));
final Coder<DataChangeRecord> dataChangeRecordCoder = dataChangeRecordsOut.getCoder();
final SizeEstimator<DataChangeRecord> dataChangeRecordSizeEstimator =
new SizeEstimator<>(dataChangeRecordCoder);
final BytesThroughputEstimator<DataChangeRecord> throughputEstimator =
new BytesThroughputEstimator<>(THROUGHPUT_WINDOW_SECONDS, dataChangeRecordSizeEstimator);
readChangeStreamPartitionDoFn.setThroughputEstimator(throughputEstimator);
impulseOut
.apply(WithTimestamps.of(e -> GlobalWindow.INSTANCE.maxTimestamp()))
.apply(Wait.on(dataChangeRecordsOut))
.apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory)));
return dataChangeRecordsOut;
}