static PipelineResult run()

in v2/jdbc-and-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java [123:293]


  static PipelineResult run(JdbcToBigQueryOptions options, Write<TableRow> writeToBQ) {
    // Validate BQ STORAGE_WRITE_API options
    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);
    if (!options.getUseStorageWriteApi()
        && !options.getUseStorageWriteApiAtLeastOnce()
        && !Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
      throw new IllegalArgumentException(
          "outputDeadletterTable can only be specified if BigQuery Storage Write API is enabled either with useStorageWriteApi or useStorageWriteApiAtLeastOnce.");
    }

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
     *        2) Append TableRow to BigQuery via BigQueryIO
     */
    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
        JdbcIO.DataSourceConfiguration.create(
                StaticValueProvider.of(options.getDriverClassName()),
                maybeDecrypt(
                    maybeParseSecret(options.getConnectionURL()), options.getKMSEncryptionKey()))
            .withUsername(
                maybeDecrypt(
                    maybeParseSecret(options.getUsername()), options.getKMSEncryptionKey()))
            .withPassword(
                maybeDecrypt(
                    maybeParseSecret(options.getPassword()), options.getKMSEncryptionKey()));

    if (options.getDriverJars() != null) {
      dataSourceConfiguration = dataSourceConfiguration.withDriverJars(options.getDriverJars());
    }

    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    /*
     * Step 1: Read records via JDBC and convert to TableRow
     *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
     */
    PCollection<TableRow> rows;
    if (options.getPartitionColumn() != null && options.getTable() != null) {
      // Read with Partitions
      JdbcIO.ReadWithPartitions<TableRow, ?> readIO = null;
      final String partitionColumnType = options.getPartitionColumnType();
      if (partitionColumnType == null || "long".equals(partitionColumnType)) {
        JdbcIO.ReadWithPartitions<TableRow, Long> longTypeReadIO =
            JdbcIO.<TableRow, Long>readWithPartitions(TypeDescriptors.longs())
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withTable(options.getTable())
                .withPartitionColumn(options.getPartitionColumn())
                .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
        if (options.getLowerBound() != null && options.getUpperBound() != null) {
          // Check if lower bound and upper bound are long type.
          try {
            longTypeReadIO =
                longTypeReadIO
                    .withLowerBound(Long.valueOf(options.getLowerBound()))
                    .withUpperBound(Long.valueOf(options.getUpperBound()));
          } catch (NumberFormatException e) {
            throw new NumberFormatException(
                "Expected Long values for lowerBound and upperBound, received : " + e.getMessage());
          }
        }
        readIO = longTypeReadIO;
      } else if ("datetime".equals(partitionColumnType)) {
        JdbcIO.ReadWithPartitions<TableRow, DateTime> dateTimeReadIO =
            JdbcIO.<TableRow, DateTime>readWithPartitions(TypeDescriptor.of(DateTime.class))
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withTable(options.getTable())
                .withPartitionColumn(options.getPartitionColumn())
                .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
        if (options.getLowerBound() != null && options.getUpperBound() != null) {
          DateTimeFormatter dateFormatter =
              DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withOffsetParsed();
          // Check if lowerBound and upperBound are DateTime type.
          try {
            dateTimeReadIO =
                dateTimeReadIO
                    .withLowerBound(dateFormatter.parseDateTime(options.getLowerBound()))
                    .withUpperBound(dateFormatter.parseDateTime(options.getUpperBound()));
          } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException(
                "Expected DateTime values in the format for lowerBound and upperBound, received : "
                    + e.getMessage());
          }
        }
        readIO = dateTimeReadIO;
      } else {
        throw new IllegalStateException("Received unsupported partitionColumnType.");
      }
      if (options.getNumPartitions() != null) {
        readIO = readIO.withNumPartitions(options.getNumPartitions());
      }
      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JDBC with Partitions", readIO);
    } else {
      if (options.getQuery() == null) {
        throw new IllegalArgumentException(
            "Either 'query' or both 'table' AND 'PartitionColumn' must be specified to read from JDBC");
      }
      JdbcIO.Read<TableRow> readIO =
          JdbcIO.<TableRow>read()
              .withDataSourceConfiguration(dataSourceConfiguration)
              .withQuery(new GCSAwareValueProvider(options.getQuery()))
              .withCoder(TableRowJsonCoder.of())
              .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));

      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JdbcIO", readIO);
    }

    /*
     * Step 2: Append TableRow to an existing BigQuery table
     */
    WriteResult writeResult = rows.apply("Write to BigQuery", writeToBQ);

    /*
     * Step 3.
     * If using Storage Write API, capture failed inserts and either
     *   a) write error rows to DLQ
     *   b) fail the pipeline
     */
    if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
      PCollection<BigQueryInsertError> insertErrors =
          BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options);

      if (!Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
        /*
         * Step 3a.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            insertErrors
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);

        /*
         * Step 3a Contd.
         * Insert records that failed insert into deadletter table
         */
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getOutputDeadletterTable())
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .setUseWindowedTimestamp(false)
                .build());
      } else {
        /*
         * Step 3b.
         * Fail pipeline upon write errors if no DLQ was specified
         */
        insertErrors.apply(ParDo.of(new ThrowWriteErrorsDoFn()));
      }
    }

    // Execute the pipeline and return the result.
    return pipeline.run();
  }