in spanner-data-validator-java/src/main/java/com/google/migration/JDBCToSpannerDVTWithHash.java [508:585]
private static PCollection<HashResult> getJDBCRecordsHelper(String tableName,
PipelineTracker pipelineTracker,
String query,
Integer keyIndex,
String rangeFieldType,
DVTOptionsCore options,
PCollection<PartitionRange> pRanges,
Integer timestampThresholdKeyIndex,
CustomTransformation customTransformation,
Schema schema,
String driver,
String connString,
String shardId,
String username,
String jdbcPass) {
if(customTransformation != null) {
PCollection<SourceRecord> jdbcRecordsSR =
pRanges.apply(String.format("ReshuffleJDBCForTable-%s", tableName),
Reshuffle.viaRandomKey())
.apply(String.format("ReadInParallelForTable-%s", tableName),
JdbcIO.<PartitionRange, SourceRecord>readAll()
.withDataSourceProviderFn(
JdbcIO.PoolableDataSourceProvider.of(DataSourceConfiguration.create(
driver, connString)
.withUsername(username)
.withPassword(jdbcPass)
.withMaxConnections(options.getMaxJDBCConnectionsPerJVM())))
.withQuery(query)
.withParameterSetter((input, preparedStatement) -> {
preparedStatement.setString(1, input.getStartRange());
preparedStatement.setString(2, input.getEndRange());
})
.withRowMapper(new SourceRecordMapper())
.withOutputParallelization(false)
);
CustomTransformationDoFn customTransformationDoFn = CustomTransformationDoFn.create(
customTransformation,
tableName,
shardId,
schema,
keyIndex,
rangeFieldType,
options.getAdjustTimestampPrecision(),
timestampThresholdKeyIndex);
return jdbcRecordsSR.apply(String.format("CustomTransformationForTable-%s", tableName),
ParDo.of(customTransformationDoFn));
} else {
PCollection<HashResult> jdbcRecords =
pRanges.apply(String.format("ReshuffleJDBCForTable-%s", tableName),
Reshuffle.viaRandomKey())
.apply(String.format("ReadInParallelForTable-%s", tableName),
JdbcIO.<PartitionRange, HashResult>readAll()
.withDataSourceProviderFn(
JdbcIO.PoolableDataSourceProvider.of(DataSourceConfiguration.create(
driver, connString)
.withUsername(username)
.withPassword(jdbcPass)
.withMaxConnections(options.getMaxJDBCConnectionsPerJVM())))
.withQuery(query)
.withParameterSetter((input, preparedStatement) -> {
preparedStatement.setString(1, input.getStartRange());
preparedStatement.setString(2, input.getEndRange());
})
.withRowMapper(new JDBCRowMapper(
keyIndex,
rangeFieldType,
options.getAdjustTimestampPrecision(),
timestampThresholdKeyIndex
))
.withOutputParallelization(false)
);
return jdbcRecords;
} // if/else
}