public static PipelineResult run()

in v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToBigQuery.java [418:614]


  public static PipelineResult run(Options options) {
    /*
     * Stages:
     *   1) Ingest and Normalize Data to FailsafeElement with JSON Strings
     *   2) Write JSON Strings to TableRow Collection
     *       - Optionally apply a UDF
     *   3) BigQuery Output of TableRow Data
     *     a) Map New Columns & Write to Staging Tables
     *     b) Map New Columns & Merge Staging to Target Table
     *   4) Write Failures to GCS Dead Letter Queue
     */

    Pipeline pipeline = Pipeline.create(options);
    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    String bigqueryProjectId = getBigQueryProjectId(options);
    String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
    String tempDlqDir = dlqManager.getRetryDlqDirectory() + "tmp/";

    InputUDFToTableRow<String> failsafeTableRowTransformer =
        new InputUDFToTableRow<String>(
            options.getJavascriptTextTransformGcsPath(),
            options.getJavascriptTextTransformFunctionName(),
            options.getJavascriptTextTransformReloadIntervalMinutes(),
            options.getPythonTextTransformGcsPath(),
            options.getPythonTextTransformFunctionName(),
            options.getRuntimeRetries(),
            FAILSAFE_ELEMENT_CODER);

    StatefulRowCleaner statefulCleaner = StatefulRowCleaner.of();

    /*
     * Stage 1: Ingest and Normalize Data to FailsafeElement with JSON Strings
     *   a) Read DataStream data from GCS into JSON String FailsafeElements (datastreamJsonRecords)
     *   b) Reconsume Dead Letter Queue data from GCS into JSON String FailsafeElements
     *     (dlqJsonRecords)
     *   c) Flatten DataStream and DLQ Streams (jsonRecords)
     */
    PCollection<FailsafeElement<String, String>> datastreamJsonRecords =
        pipeline.apply(
            new DataStreamIO(
                    options.getStreamName(),
                    options.getInputFilePattern(),
                    options.getInputFileFormat(),
                    options.getGcsPubSubSubscription(),
                    options.getRfcStartDateTime())
                .withFileReadConcurrency(options.getFileReadConcurrency()));

    // Elements sent to the Dead Letter Queue are to be reconsumed.
    // A DLQManager is to be created using PipelineOptions, and it is in charge
    // of building pieces of the DLQ.
    PCollection<FailsafeElement<String, String>> dlqJsonRecords =
        pipeline
            .apply("DLQ Consumer/reader", dlqManager.dlqReconsumer(options.getDlqRetryMinutes()))
            .apply(
                "DLQ Consumer/cleaner",
                ParDo.of(
                    new DoFn<String, FailsafeElement<String, String>>() {
                      @ProcessElement
                      public void process(
                          @Element String input,
                          OutputReceiver<FailsafeElement<String, String>> receiver) {
                        receiver.output(FailsafeElement.of(input, input));
                      }
                    }))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    PCollection<FailsafeElement<String, String>> jsonRecords =
        PCollectionList.of(datastreamJsonRecords)
            .and(dlqJsonRecords)
            .apply("Merge Datastream & DLQ", Flatten.pCollections());

    /*
     * Stage 2: Write JSON Strings to TableRow PCollectionTuple
     *   a) Optionally apply a Javascript or Python UDF
     *   b) Convert JSON String FailsafeElements to TableRow's (tableRowRecords)
     */
    PCollectionTuple tableRowRecords =
        jsonRecords.apply("UDF to TableRow/udf", failsafeTableRowTransformer);

    PCollectionTuple cleanedRows =
        tableRowRecords
            .get(failsafeTableRowTransformer.transformOut)
            .apply("UDF to TableRow/Oracle Cleaner", statefulCleaner);

    PCollection<TableRow> shuffledTableRows =
        cleanedRows
            .get(statefulCleaner.successTag)
            .apply(
                "UDF to TableRow/ReShuffle",
                Reshuffle.<TableRow>viaRandomKey().withNumBuckets(100));

    /*
     * Stage 3: BigQuery Output of TableRow Data
     *   a) Map New Columns & Write to Staging Tables (writeResult)
     *   b) Map New Columns & Merge Staging to Target Table (null)
     *
     *   failsafe: writeResult.getFailedInsertsWithErr()
     */
    // TODO(beam 2.23): InsertRetryPolicy should be CDC compliant
    Set<String> fieldsToIgnore = getFieldsToIgnore(options.getIgnoreFields());

    WriteResult writeResult =
        shuffledTableRows
            .apply(
                "Map to Staging Tables",
                new DataStreamMapper(
                        options.as(GcpOptions.class),
                        options.getOutputProjectId(),
                        options.getOutputStagingDatasetTemplate(),
                        options.getOutputStagingTableNameTemplate())
                    .withDataStreamRootUrl(options.getDataStreamRootUrl())
                    .withDefaultSchema(BigQueryDefaultSchemas.DATASTREAM_METADATA_SCHEMA)
                    .withDayPartitioning(true)
                    .withIgnoreFields(fieldsToIgnore))
            .apply(
                "Write Successful Records",
                BigQueryIO.<KV<TableId, TableRow>>write()
                    .to(new BigQueryDynamicConverters().bigQueryDynamicDestination())
                    .withFormatFunction(
                        element -> removeTableRowFields(element.getValue(), fieldsToIgnore))
                    .withFormatRecordOnFailureFunction(element -> element.getValue())
                    .withoutValidation()
                    .ignoreInsertIds()
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo() // takes effect only when Storage Write API is off
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

    if (options.getApplyMerge()) {
      shuffledTableRows
          .apply(
              "Map To Replica Tables",
              new DataStreamMapper(
                      options.as(GcpOptions.class),
                      options.getOutputProjectId(),
                      options.getOutputDatasetTemplate(),
                      options.getOutputTableNameTemplate())
                  .withDataStreamRootUrl(options.getDataStreamRootUrl())
                  .withDefaultSchema(BigQueryDefaultSchemas.DATASTREAM_METADATA_SCHEMA)
                  .withIgnoreFields(fieldsToIgnore))
          .apply(
              "BigQuery Merge/Build MergeInfo",
              new MergeInfoMapper(
                  bigqueryProjectId,
                  options.getOutputStagingDatasetTemplate(),
                  options.getOutputStagingTableNameTemplate(),
                  options.getOutputDatasetTemplate(),
                  options.getOutputTableNameTemplate()))
          .apply(
              "BigQuery Merge/Merge into Replica Tables",
              BigQueryMerger.of(
                  MergeConfiguration.bigQueryConfiguration()
                      .withProjectId(bigqueryProjectId)
                      .withMergeWindowDuration(
                          Duration.standardMinutes(options.getMergeFrequencyMinutes()))
                      .withMergeConcurrency(options.getMergeConcurrency())
                      .withPartitionRetention(options.getPartitionRetentionDays())));
    }

    /*
     * Stage 4: Write Failures to GCS Dead Letter Queue
     */
    PCollection<String> udfDlqJson =
        PCollectionList.of(tableRowRecords.get(failsafeTableRowTransformer.udfDeadletterOut))
            .and(tableRowRecords.get(failsafeTableRowTransformer.transformDeadletterOut))
            .apply("Transform Failures/Flatten", Flatten.pCollections())
            .apply(
                "Transform Failures/Sanitize",
                MapElements.via(new StringDeadLetterQueueSanitizer()));

    PCollection<String> rowCleanerJson =
        cleanedRows
            .get(statefulCleaner.failureTag)
            .apply(
                "Transform Failures/Oracle Cleaner Failures",
                MapElements.via(new RowCleanerDeadLetterQueueSanitizer()));

    PCollection<String> bqWriteDlqJson =
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply("BigQuery Failures", MapElements.via(new BigQueryDeadLetterQueueSanitizer()));

    PCollectionList.of(udfDlqJson)
        .and(rowCleanerJson)
        .and(bqWriteDlqJson)
        .apply("Write To DLQ/Flatten", Flatten.pCollections())
        .apply(
            "Write To DLQ/Writer",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqDirectory)
                .withTmpDirectory(tempDlqDir)
                .setIncludePaneInfo(true)
                .build());

    // Execute the pipeline and return the result.
    return pipeline.run();
  }