public PCollection expand()

in v1/src/main/java/com/google/cloud/teleport/spanner/spannerio/SpannerIO.java [1730:1875]


    public PCollection<DataChangeRecord> expand(PBegin input) {
      checkArgument(
          getSpannerConfig() != null,
          "SpannerIO.readChangeStream() requires the spanner config to be set.");
      checkArgument(
          getSpannerConfig().getProjectId() != null,
          "SpannerIO.readChangeStream() requires the project ID to be set.");
      checkArgument(
          getSpannerConfig().getInstanceId() != null,
          "SpannerIO.readChangeStream() requires the instance ID to be set.");
      checkArgument(
          getSpannerConfig().getDatabaseId() != null,
          "SpannerIO.readChangeStream() requires the database ID to be set.");
      checkArgument(
          getChangeStreamName() != null,
          "SpannerIO.readChangeStream() requires the name of the change stream to be set.");
      checkArgument(
          getInclusiveStartAt() != null,
          "SpannerIO.readChangeStream() requires the start time to be set.");
      // Inclusive end at is defaulted to ChangeStreamsContants.MAX_INCLUSIVE_END_AT
      checkArgument(
          getInclusiveEndAt() != null,
          "SpannerIO.readChangeStream() requires the end time to be set. If you'd like to process the stream without an end time, you can omit this parameter.");
      if (getMetadataInstance() != null) {
        checkArgument(
            getMetadataDatabase() != null,
            "SpannerIO.readChangeStream() requires the metadata database to be set if metadata instance is set.");
      }

      // Start time must be before end time
      if (getInclusiveEndAt() != null
          && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimestamp())) {
        throw new IllegalArgumentException("Start time cannot be after end time.");
      }

      final DatabaseId changeStreamDatabaseId =
          DatabaseId.of(
              getSpannerConfig().getProjectId().get(),
              getSpannerConfig().getInstanceId().get(),
              getSpannerConfig().getDatabaseId().get());
      final String partitionMetadataInstanceId =
          MoreObjects.firstNonNull(
              getMetadataInstance(), changeStreamDatabaseId.getInstanceId().getInstance());
      final String partitionMetadataDatabaseId =
          MoreObjects.firstNonNull(getMetadataDatabase(), changeStreamDatabaseId.getDatabase());
      final DatabaseId fullPartitionMetadataDatabaseId =
          DatabaseId.of(
              getSpannerConfig().getProjectId().get(),
              partitionMetadataInstanceId,
              partitionMetadataDatabaseId);

      final SpannerConfig changeStreamSpannerConfig = buildChangeStreamSpannerConfig();
      final SpannerConfig partitionMetadataSpannerConfig =
          MetadataSpannerConfigFactory.create(
              changeStreamSpannerConfig, partitionMetadataInstanceId, partitionMetadataDatabaseId);
      final Dialect changeStreamDatabaseDialect =
          getDialect(changeStreamSpannerConfig, input.getPipeline().getOptions());
      final Dialect metadataDatabaseDialect =
          getDialect(partitionMetadataSpannerConfig, input.getPipeline().getOptions());
      LOG.info(
          "The Spanner database "
              + changeStreamDatabaseId
              + " has dialect "
              + changeStreamDatabaseDialect);
      LOG.info(
          "The Spanner database "
              + fullPartitionMetadataDatabaseId
              + " has dialect "
              + metadataDatabaseDialect);
      PartitionMetadataTableNames partitionMetadataTableNames =
          Optional.ofNullable(getMetadataTable())
              .map(
                  table ->
                      PartitionMetadataTableNames.fromExistingTable(
                          partitionMetadataDatabaseId, table))
              .orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));
      final String changeStreamName = getChangeStreamName();
      final Timestamp startTimestamp = getInclusiveStartAt();
      // Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the
      // interval into a closed-open in the read change stream restriction (prevents overflow)
      final Timestamp endTimestamp =
          getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
              ? MAX_INCLUSIVE_END_AT
              : getInclusiveEndAt();
      final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect);
      final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
      final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
      final DaoFactory daoFactory =
          new DaoFactory(
              changeStreamSpannerConfig,
              changeStreamName,
              partitionMetadataSpannerConfig,
              partitionMetadataTableNames,
              rpcPriority,
              input.getPipeline().getOptions().getJobName(),
              changeStreamDatabaseDialect,
              metadataDatabaseDialect);
      final ActionFactory actionFactory = new ActionFactory();

      final Duration watermarkRefreshRate =
          MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE);
      final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate);

      final InitializeDoFn initializeDoFn =
          new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
      final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
          new DetectNewPartitionsDoFn(
              daoFactory, mapperFactory, actionFactory, cacheFactory, metrics);
      final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
          new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
      final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
          new PostProcessingMetricsDoFn(metrics);

      LOG.info(
          "Partition metadata table that will be used is "
              + partitionMetadataTableNames.getTableName());

      final PCollection<byte[]> impulseOut = input.apply(Impulse.create());
      final PCollection<PartitionMetadata> partitionsOut =
          impulseOut
              .apply("Initialize the connector", ParDo.of(initializeDoFn))
              .apply("Detect new partitions", ParDo.of(detectNewPartitionsDoFn));
      final Coder<PartitionMetadata> partitionMetadataCoder = partitionsOut.getCoder();
      final SizeEstimator<PartitionMetadata> partitionMetadataSizeEstimator =
          new SizeEstimator<>(partitionMetadataCoder);
      final long averagePartitionBytesSize =
          partitionMetadataSizeEstimator.sizeOf(ChangeStreamsConstants.SAMPLE_PARTITION);
      detectNewPartitionsDoFn.setAveragePartitionBytesSize(averagePartitionBytesSize);

      final PCollection<DataChangeRecord> dataChangeRecordsOut =
          partitionsOut
              .apply("Read change stream partition", ParDo.of(readChangeStreamPartitionDoFn))
              .apply("Gather metrics", ParDo.of(postProcessingMetricsDoFn));
      final Coder<DataChangeRecord> dataChangeRecordCoder = dataChangeRecordsOut.getCoder();
      final SizeEstimator<DataChangeRecord> dataChangeRecordSizeEstimator =
          new SizeEstimator<>(dataChangeRecordCoder);
      final BytesThroughputEstimator<DataChangeRecord> throughputEstimator =
          new BytesThroughputEstimator<>(THROUGHPUT_WINDOW_SECONDS, dataChangeRecordSizeEstimator);
      readChangeStreamPartitionDoFn.setThroughputEstimator(throughputEstimator);

      impulseOut
          .apply(WithTimestamps.of(e -> GlobalWindow.INSTANCE.maxTimestamp()))
          .apply(Wait.on(dataChangeRecordsOut))
          .apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory)));
      return dataChangeRecordsOut;
    }