private static PCollection getJDBCRecordsHelper()

in spanner-data-validator-java/src/main/java/com/google/migration/JDBCToSpannerDVTWithHash.java [508:585]


  private static PCollection<HashResult> getJDBCRecordsHelper(String tableName,
      PipelineTracker pipelineTracker,
      String query,
      Integer keyIndex,
      String rangeFieldType,
      DVTOptionsCore options,
      PCollection<PartitionRange> pRanges,
      Integer timestampThresholdKeyIndex,
      CustomTransformation customTransformation,
      Schema schema,
      String driver,
      String connString,
      String shardId,
      String username,
      String jdbcPass) {

    if(customTransformation != null) {
      PCollection<SourceRecord> jdbcRecordsSR =
          pRanges.apply(String.format("ReshuffleJDBCForTable-%s", tableName),
                  Reshuffle.viaRandomKey())
              .apply(String.format("ReadInParallelForTable-%s", tableName),
                  JdbcIO.<PartitionRange, SourceRecord>readAll()
                      .withDataSourceProviderFn(
                          JdbcIO.PoolableDataSourceProvider.of(DataSourceConfiguration.create(
                              driver, connString)
                          .withUsername(username)
                          .withPassword(jdbcPass)
                          .withMaxConnections(options.getMaxJDBCConnectionsPerJVM())))
                      .withQuery(query)
                      .withParameterSetter((input, preparedStatement) -> {
                        preparedStatement.setString(1, input.getStartRange());
                        preparedStatement.setString(2, input.getEndRange());
                      })
                      .withRowMapper(new SourceRecordMapper())
                      .withOutputParallelization(false)
              );

      CustomTransformationDoFn customTransformationDoFn = CustomTransformationDoFn.create(
          customTransformation,
          tableName,
          shardId,
          schema,
          keyIndex,
          rangeFieldType,
          options.getAdjustTimestampPrecision(),
          timestampThresholdKeyIndex);

      return jdbcRecordsSR.apply(String.format("CustomTransformationForTable-%s", tableName),
          ParDo.of(customTransformationDoFn));
    } else {
      PCollection<HashResult> jdbcRecords =
          pRanges.apply(String.format("ReshuffleJDBCForTable-%s", tableName),
                  Reshuffle.viaRandomKey())
              .apply(String.format("ReadInParallelForTable-%s", tableName),
                  JdbcIO.<PartitionRange, HashResult>readAll()
                      .withDataSourceProviderFn(
                          JdbcIO.PoolableDataSourceProvider.of(DataSourceConfiguration.create(
                                  driver, connString)
                              .withUsername(username)
                              .withPassword(jdbcPass)
                              .withMaxConnections(options.getMaxJDBCConnectionsPerJVM())))
                      .withQuery(query)
                      .withParameterSetter((input, preparedStatement) -> {
                        preparedStatement.setString(1, input.getStartRange());
                        preparedStatement.setString(2, input.getEndRange());
                      })
                      .withRowMapper(new JDBCRowMapper(
                          keyIndex,
                          rangeFieldType,
                          options.getAdjustTimestampPrecision(),
                          timestampThresholdKeyIndex
                      ))
                      .withOutputParallelization(false)
              );

      return jdbcRecords;
    } // if/else
  }