public static PipelineResult run()

in v2/pubsub-cdc-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/PubSubCdcToBigQuery.java [286:532]


  public static PipelineResult run(Options options) {

    Pipeline pipeline = Pipeline.create(options);
    DeadLetterQueueManager dlqManager = buildDlqManager(options);
    String gcsOutputDateTimeDirectory = null;

    if (!Strings.isNullOrEmpty(options.getDeadLetterQueueDirectory())) {
      gcsOutputDateTimeDirectory = dlqManager.getRetryDlqDirectory() + "YYYY/MM/DD/HH/mm/";
    }

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    InputUDFToTableRow<String> failsafeTableRowTransformer =
        new InputUDFToTableRow<String>(
            options.getJavascriptTextTransformGcsPath(),
            options.getJavascriptTextTransformFunctionName(),
            options.getJavascriptTextTransformReloadIntervalMinutes(),
            options.getPythonTextTransformGcsPath(),
            options.getPythonTextTransformFunctionName(),
            options.getRuntimeRetries(),
            FAILSAFE_ELEMENT_CODER);

    BigQueryTableConfigManager bqConfigManager =
        new BigQueryTableConfigManager(
            options.as(GcpOptions.class).getProject(),
            options.getOutputDatasetTemplate(),
            options.getOutputTableNameTemplate(),
            options.getOutputTableSpec());

    /*
     * Steps:
     *  1) Read messages in from Pub/Sub
     *  2) Transform the PubsubMessages into TableRows
     *     - Transform message payload via UDF
     *     - Convert UDF result to TableRow objects
     *  3) Write successful records out to BigQuery
     *     - Automap new objects to BigQuery if enabled
     *     - Write records to BigQuery tables
     *  4) Write failed records out to BigQuery
     */

    /*
     * Step #1: Read messages in from Pub/Sub
     */

    PCollection<PubsubMessage> messages =
        pipeline.apply(
            "ReadPubSubSubscription",
            PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()));

    PCollection<FailsafeElement<String, String>> jsonRecords;

    if (options.getDeadLetterQueueDirectory() != null) {

      PCollection<FailsafeElement<String, String>> failsafeMessages =
          messages.apply("ConvertPubSubToFailsafe", ParDo.of(new PubSubToFailSafeElement()));

      PCollection<FailsafeElement<String, String>> dlqJsonRecords =
          pipeline
              .apply(dlqManager.dlqReconsumer())
              .apply(
                  ParDo.of(
                      new DoFn<String, FailsafeElement<String, String>>() {
                        @ProcessElement
                        public void process(
                            @Element String input,
                            OutputReceiver<FailsafeElement<String, String>> receiver) {
                          receiver.output(FailsafeElement.of(input, input));
                        }
                      }))
              .setCoder(FAILSAFE_ELEMENT_CODER);

      jsonRecords =
          PCollectionList.of(failsafeMessages).and(dlqJsonRecords).apply(Flatten.pCollections());
    } else {
      jsonRecords =
          messages.apply("ConvertPubSubToFailsafe", ParDo.of(new PubSubToFailSafeElement()));
    }

    PCollectionTuple convertedTableRows =
        jsonRecords
            /*
             * Step #2: Transform the PubsubMessages into TableRows
             */
            .apply(
                Reshuffle.<FailsafeElement<String, String>>viaRandomKey()
                    .withNumBuckets(options.getThreadCount()))
            .apply("ApplyUdfAndConvertToTableRow", failsafeTableRowTransformer);

    /*
     * Step #3: Write the successful records out to BigQuery
     *   Either extract table destination only
     *   or extract table destination and auto-map new columns
     */
    PCollection<KV<TableId, TableRow>> tableEvents;
    if (options.getAutoMapTables()) {
      tableEvents =
          convertedTableRows
              .get(failsafeTableRowTransformer.transformOut)
              .apply(
                  "Map Data to BigQuery Tables",
                  new BigQueryMappers(bqConfigManager.getProjectId())
                      .buildBigQueryTableMapper(
                          bqConfigManager.getDatasetTemplate(), bqConfigManager.getTableTemplate())
                      .withDefaultSchemaFromGCS(options.getSchemaFilePath()));

    } else {
      tableEvents =
          convertedTableRows
              .get(failsafeTableRowTransformer.transformOut)
              .apply(
                  "ExtractBigQueryTableDestination",
                  BigQueryDynamicConverters.extractTableRowDestination(
                      bqConfigManager.getProjectId(),
                      bqConfigManager.getDatasetTemplate(),
                      bqConfigManager.getTableTemplate()));
    }

    /*
     * Step #3: Cont.
     *    - Write rows out to BigQuery
     */
    // TODO(https://github.com/apache/beam/pull/12004): Switch out alwaysRetry
    WriteResult writeResult =
        tableEvents.apply(
            "WriteSuccessfulRecords",
            BigQueryIO.<KV<TableId, TableRow>>write()
                .to(new BigQueryDynamicConverters().bigQueryDynamicDestination())
                .withFormatFunction(element -> element.getValue())
                .withoutValidation()
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withExtendedErrorInfo()
                .withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry()));

    /*
     * Step 3 Contd.
     * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
     */
    /*
     * Stage 4: Write Failures to GCS Dead Letter Queue
     */
    // TODO: Cover tableRowRecords.get(TRANSFORM_DEADLETTER_OUT) error values
    if (options.getDeadLetterQueueDirectory() != null) {

      BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
          .apply(
              "DLQ: Write Insert Failures to GCS",
              MapElements.via(new BigQueryDeadLetterQueueSanitizer()))
          .apply(
              "Creating " + options.getWindowDuration() + " Window",
              Window.into(
                  FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
          .apply(
              "DLQ: Write File(s)",
              TextIO.write()
                  .withWindowedWrites()
                  .withNumShards(20)
                  .to(
                      WindowedFilenamePolicy.writeWindowedFiles()
                          .withOutputDirectory(gcsOutputDateTimeDirectory)
                          .withOutputFilenamePrefix("error")
                          .withShardTemplate("-SSSSS-of-NNNNN")
                          .withSuffix(".json"))
                  .withTempDirectory(
                      FileBasedSink.convertToFileResourceIfPossible(
                          options.getDeadLetterQueueDirectory())));

      PCollection<FailsafeElement<String, String>> transformDeadletter =
          PCollectionList.of(
                  ImmutableList.of(
                      convertedTableRows.get(failsafeTableRowTransformer.udfDeadletterOut),
                      convertedTableRows.get(failsafeTableRowTransformer.transformDeadletterOut)))
              .apply("Flatten", Flatten.pCollections())
              .apply(
                  "Creating " + options.getWindowDuration() + " Window",
                  Window.into(
                      FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))));

      PCollection<String> dlqWindowing =
          transformDeadletter
              .apply("Sanitize records", MapElements.via(new StringDeadLetterQueueSanitizer()))
              .setCoder(StringUtf8Coder.of());

      dlqWindowing.apply(
          "DLQ: Write File(s)",
          TextIO.write()
              .withWindowedWrites()
              .withNumShards(20)
              .to(
                  WindowedFilenamePolicy.writeWindowedFiles()
                      .withOutputDirectory(gcsOutputDateTimeDirectory)
                      .withOutputFilenamePrefix("error")
                      .withShardTemplate("-SSSSS-of-NNNNN")
                      .withSuffix(".json"))
              .withTempDirectory(
                  FileBasedSink.convertToFileResourceIfPossible(
                      gcsOutputDateTimeDirectory + "tmp/")));

    } else {
      PCollection<FailsafeElement<String, String>> failedInserts =
          BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
              .apply(
                  "WrapInsertionErrors",
                  MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                      .via(
                          (BigQueryInsertError e) -> BigQueryConverters.wrapBigQueryInsertError(e)))
              .setCoder(FAILSAFE_ELEMENT_CODER);

      /*
       * Step #4: Write records that failed table row transformation
       * or conversion out to BigQuery deadletter table.
       */
      PCollectionList.of(
              ImmutableList.of(
                  convertedTableRows.get(failsafeTableRowTransformer.udfDeadletterOut),
                  convertedTableRows.get(failsafeTableRowTransformer.transformDeadletterOut)))
          .apply("Flatten", Flatten.pCollections())
          .apply(
              "WriteFailedRecords",
              ErrorConverters.WriteStringMessageErrors.newBuilder()
                  .setErrorRecordsTable(
                      BigQueryConverters.maybeUseDefaultDeadletterTable(
                          options.getOutputDeadletterTable(),
                          bqConfigManager.getOutputTableSpec(),
                          DEFAULT_DEADLETTER_TABLE_SUFFIX))
                  .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                  .build());

      // 5) Insert records that failed insert into deadletter table
      failedInserts.apply(
          "WriteFailedRecords",
          ErrorConverters.WriteStringMessageErrors.newBuilder()
              .setErrorRecordsTable(
                  BigQueryConverters.maybeUseDefaultDeadletterTable(
                      options.getOutputDeadletterTable(),
                      bqConfigManager.getOutputTableSpec(),
                      DEFAULT_DEADLETTER_TABLE_SUFFIX))
              .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
              .build());
    }

    return pipeline.run();
  }