public static PipelineResult run()

in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstopubsub/BigtableChangeStreamsToPubSub.java [174:357]


  public static PipelineResult run(BigtableChangeStreamsToPubSubOptions options) {
    setOptions(options);
    validateOptions(options);

    String bigtableProject = getBigtableProjectId(options);

    // Retrieve and parse the startTimestamp
    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    BigtableSource sourceInfo =
        new BigtableSource(
            options.getBigtableReadInstanceId(),
            options.getBigtableReadTableId(),
            getBigtableCharset(options),
            options.getBigtableChangeStreamIgnoreColumnFamilies(),
            options.getBigtableChangeStreamIgnoreColumns(),
            startTimestamp);

    Topic topic = null;
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      GetTopicRequest request =
          GetTopicRequest.newBuilder()
              .setTopic(
                  TopicName.ofProjectTopicName(
                          getPubSubProjectId(options), options.getPubSubTopic())
                      .toString())
              .build();
      topic = topicAdminClient.getTopic(request);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }

    try {
      if (!validateSchema(topic, options, sourceInfo)) {
        final String errorMessage = "Configured topic doesn't accept messages of configured format";
        throw new IllegalArgumentException(errorMessage);
      }

    } catch (Exception e) {
      throw new IllegalArgumentException(e);
    }

    PubSubDestination destinationInfo = newPubSubDestination(options, topic);
    PubSubUtils pubSub = new PubSubUtils(sourceInfo, destinationInfo);

    /*
     * Stages:
     * 1) Read {@link ChangeStreamMutation} from change stream.
     * 2) Create {@link FailsafeElement} of {@link Mod} JSON and merge from:
     *    - {@link ChangeStreamMutation}.
     *    - GCS Dead letter queue.
     * 3) Convert {@link Mod} JSON into PubsubMessage and publish it to PubSub.
     * 4) Write Failures from 2) and 3) to GCS dead letter queue.
     */
    // Step 1
    Pipeline pipeline = Pipeline.create(options);

    // Register the coders for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
    String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";

    if (options.getDisableDlqRetries()) {
      LOG.info(
          "Disabling retries for the DLQ, directly writing into severe DLQ: {}",
          dlqManager.getSevereDlqDirectoryWithDateTime());
      dlqDirectory = dlqManager.getSevereDlqDirectoryWithDateTime();
      tempDlqDirectory = dlqManager.getSevereDlqDirectory() + "tmp/";
    }

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL
                    : BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProject)
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withStartTime(startTimestamp);

    if (!StringUtils.isBlank(options.getBigtableChangeStreamMetadataTableTableId())) {
      readChangeStream =
          readChangeStream.withMetadataTableTableId(
              options.getBigtableChangeStreamMetadataTableTableId());
    }
    // Step 2: just return the output for sending to pubSub and DLQ
    PCollection<ChangeStreamMutation> dataChangeRecord =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply(Values.create());

    PCollection<FailsafeElement<String, String>> sourceFailsafeModJson =
        dataChangeRecord
            .apply(
                "ChangeStreamMutation To Mod JSON",
                ParDo.of(new ChangeStreamMutationToModJsonFn(sourceInfo)))
            .apply(
                "Wrap Mod JSON In FailsafeElement",
                ParDo.of(
                    new DoFn<String, FailsafeElement<String, String>>() {
                      @ProcessElement
                      public void process(
                          @Element String input,
                          OutputReceiver<FailsafeElement<String, String>> receiver) {
                        receiver.output(FailsafeElement.of(input, input));
                      }
                    }))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    PCollectionTuple dlqModJson =
        dlqManager.getReconsumerDataTransform(
            pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));

    PCollection<FailsafeElement<String, String>> retryableDlqFailsafeModJson = null;
    if (options.getDisableDlqRetries()) {
      retryableDlqFailsafeModJson = pipeline.apply(Create.empty(FAILSAFE_ELEMENT_CODER));
    } else {
      retryableDlqFailsafeModJson =
          dlqModJson.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
    }

    PCollection<FailsafeElement<String, String>> failsafeModJson =
        PCollectionList.of(sourceFailsafeModJson)
            .and(retryableDlqFailsafeModJson)
            .apply("Merge Source And DLQ Mod JSON", Flatten.pCollections());

    FailsafePublisher.FailsafeModJsonToPubsubMessageOptions failsafeModJsonToPubsubOptions =
        FailsafePublisher.FailsafeModJsonToPubsubMessageOptions.builder()
            .setCoder(FAILSAFE_ELEMENT_CODER)
            .build();

    PublishModJsonToTopic publishModJsonToTopic =
        new PublishModJsonToTopic(pubSub, failsafeModJsonToPubsubOptions);

    PCollection<FailsafeElement<String, String>> failedToPublish =
        failsafeModJson.apply("Publish Mod JSON To Pubsub", publishModJsonToTopic);

    PCollection<String> transformDlqJson =
        failedToPublish.apply(
            "Failed Mod JSON During Table Row Transformation",
            MapElements.via(new StringDeadLetterQueueSanitizer()));

    PCollectionList.of(transformDlqJson)
        .apply("Merge Failed Mod JSON From Transform And PubSub", Flatten.pCollections())
        .apply(
            "Write Failed Mod JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqDirectory)
                .withTmpDirectory(tempDlqDirectory)
                .setIncludePaneInfo(true)
                .build());

    PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
        dlqModJson.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
    LOG.info(
        "DLQ manager severe DLQ directory with date time: {}",
        dlqManager.getSevereDlqDirectoryWithDateTime());
    LOG.info("DLQ manager severe DLQ directory: {}", dlqManager.getSevereDlqDirectory() + "tmp/");
    nonRetryableDlqModJsonFailsafe
        .apply(
            "Write Mod JSON With Non-retriable Error To DLQ",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }