in spanner-data-validator-java/src/main/java/com/google/migration/JDBCToSpannerDVTWithHash.java [587:646]
protected static PCollection<HashResult> getJDBCRecordsWithSharding(String tableName,
PipelineTracker pipelineTracker,
String query,
Integer keyIndex,
String rangeFieldType,
DVTOptionsCore options,
PCollection<PartitionRange> pRanges,
Integer timestampThresholdIndex,
CustomTransformation customTransformation,
Schema schema) {
String driver = POSTGRES_JDBC_DRIVER;
if(options.getProtocol().compareTo("mysql") == 0) {
driver = MYSQL_JDBC_DRIVER;
}
String shardConfigurationFileUrl = options.getSourceConfigURL();
List<Shard> shards = new ShardFileReader(new SecretManagerAccessorImpl()).readShardingConfig(shardConfigurationFileUrl);
LOG.info("Total shards read: {}", shards.size());
ArrayList<PCollection<HashResult>> pCollections = new ArrayList<>();
for(Shard shard: shards) {
// https://stackoverflow.com/questions/68353660/zero-date-value-prohibited-hibernate-sql-jpa
String zeroDateTimeNullBehaviorStr = options.getZeroDateTimeBehavior() ? "?zeroDateTimeBehavior=CONVERT_TO_NULL" : "";
// JDBC conn string
String connString = String.format("jdbc:%s://%s:%d/%s%s", options.getProtocol(),
shard.getHost(),
Integer.parseInt(shard.getPort()),
shard.getDbName(),
zeroDateTimeNullBehaviorStr);
PCollection<HashResult> hashedJDBCRecordsPerShard = getJDBCRecordsHelper(
tableName,
pipelineTracker,
query,
keyIndex,
rangeFieldType,
options,
pRanges,
timestampThresholdIndex,
customTransformation,
schema,
driver,
connString,
shard.getLogicalShardId(),
shard.getUserName(),
shard.getPassword());
pCollections.add(hashedJDBCRecordsPerShard);
} // for
String flattenStepName = String.format("FlattenJDBCRecordsForTable-%s", tableName);
PCollection<HashResult> mergedJdbcRecords =
PCollectionList.of(pCollections).apply(flattenStepName, Flatten.pCollections());
return mergedJdbcRecords;
}