public static PipelineResult run()

in v2/spanner-change-streams-to-sharded-file-sink/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToShardedFileSink.java [352:483]


  public static PipelineResult run(Options options) {

    Pipeline pipeline = Pipeline.create(options);
    ShardFileReader shardFileReader = new ShardFileReader(new SecretManagerAccessorImpl());
    List<Shard> shards = shardFileReader.getOrderedShardDetails(options.getSourceShardsFilePath());
    if (shards == null || shards.isEmpty()) {
      throw new RuntimeException("The source shards file cannot be empty");
    }

    String shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
    if (shards.size() > 1) {
      shardingMode = Constants.SHARDING_MODE_MULTI_SHARD;
    }

    // Prepare Spanner config
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId()))
            .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
            .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()));

    SpannerConfig spannerMetadataConfig =
        SpannerConfig.create()
            .withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId()))
            .withInstanceId(ValueProvider.StaticValueProvider.of(options.getMetadataInstance()))
            .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getMetadataDatabase()));
    boolean isMetadataDbPostgres =
        Dialect.POSTGRESQL
            == SpannerAccessor.getOrCreate(spannerMetadataConfig)
                .getDatabaseAdminClient()
                .getDatabase(
                    spannerMetadataConfig.getInstanceId().get(),
                    spannerMetadataConfig.getDatabaseId().get())
                .getDialect();

    Schema schema = null;
    Ddl ddl = null;
    if (shardingMode.equals(Constants.SHARDING_MODE_MULTI_SHARD)) {
      schema = SessionFileReader.read(options.getSessionFilePath());
      ddl = InformationSchemaReader.getInformationSchemaAsDdl(spannerConfig);
    }

    String tableSuffix = "";
    if (options.getMetadataTableSuffix() != null && !options.getMetadataTableSuffix().isEmpty()) {

      tableSuffix = options.getMetadataTableSuffix();
      if (!Pattern.compile("[a-zA-Z0-9_]+").matcher(tableSuffix).matches()) {
        throw new RuntimeException(
            "Only alpha numeric and underscores allowed in metadataTableSuffix, however found : "
                + tableSuffix);
      }
    }

    // Have a common start time stamp when updating the metadata tables
    // And when reading from change streams
    SpannerDao spannerDao =
        new SpannerDao(
            options.getSpannerProjectId(),
            options.getMetadataInstance(),
            options.getMetadataDatabase(),
            tableSuffix,
            isMetadataDbPostgres);
    SpannerToGcsJobMetadata jobMetadata = getStartTimeAndDuration(options, spannerDao);

    // Capture the window start time and duration config.
    // This is read by the GCSToSource template to ensure the same config is used in both templates.
    if (options.getRunMode().equals(Constants.RUN_MODE_REGULAR)) {
      JobMetadataUpdater.writeStartAndDuration(spannerDao, options.getRunIdentifier(), jobMetadata);
    }

    // Initialize the per shard progress with historical value
    // This makes it easier to fire blind UPDATES later on when
    // updating per shard file creation progress
    FileCreationTracker fileCreationTracker =
        new FileCreationTracker(spannerDao, options.getRunIdentifier());
    fileCreationTracker.init(shards);

    spannerDao.close();

    pipeline
        .apply(
            getReadChangeStreamDoFn(
                options, spannerConfig, Timestamp.parseTimestamp(jobMetadata.getStartTimestamp())))
        .apply("Reshuffle", Reshuffle.viaRandomKey())
        .apply(ParDo.of(new FilterRecordsFn(options.getFiltrationMode())))
        .apply(ParDo.of(new PreprocessRecordsFn()))
        .setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class))
        .apply(
            ParDo.of(
                new AssignShardIdFn(
                    spannerConfig,
                    schema,
                    ddl,
                    shardingMode,
                    shards.get(0).getLogicalShardId(),
                    options.getSkipDirectoryName(),
                    options.getShardingCustomJarPath(),
                    options.getShardingCustomClassName(),
                    options.getShardingCustomParameters())))
        .apply(
            "Creating " + options.getWindowDuration() + " Window",
            Window.into(
                FixedWindows.of(DurationUtils.parseDuration(jobMetadata.getWindowDuration()))))
        .apply(
            "Tracking change data seen",
            ParDo.of(
                new ChangeDataProgressTrackerFn(
                    spannerMetadataConfig,
                    tableSuffix,
                    options.getRunIdentifier(),
                    isMetadataDbPostgres)))
        .apply("Reshuffle", Reshuffle.viaRandomKey())
        .apply(
            "Write To GCS",
            WriterGCS.newBuilder()
                .withGcsOutputDirectory(options.getGcsOutputDirectory())
                .withTempLocation(options.getTempLocation())
                .build())
        .apply(
            "Creating file tracking window",
            Window.into(
                FixedWindows.of(DurationUtils.parseDuration(jobMetadata.getWindowDuration()))))
        .apply(
            "Tracking file progress ",
            ParDo.of(
                new FileProgressTrackerFn(
                    spannerMetadataConfig,
                    tableSuffix,
                    options.getRunIdentifier(),
                    isMetadataDbPostgres)));
    return pipeline.run();
  }