protected static PCollection getJDBCRecordsWithSharding()

in spanner-data-validator-java/src/main/java/com/google/migration/JDBCToSpannerDVTWithHash.java [587:646]


  protected static PCollection<HashResult> getJDBCRecordsWithSharding(String tableName,
      PipelineTracker pipelineTracker,
      String query,
      Integer keyIndex,
      String rangeFieldType,
      DVTOptionsCore options,
      PCollection<PartitionRange> pRanges,
      Integer timestampThresholdIndex,
      CustomTransformation customTransformation,
      Schema schema) {

    String driver = POSTGRES_JDBC_DRIVER;
    if(options.getProtocol().compareTo("mysql") == 0) {
      driver = MYSQL_JDBC_DRIVER;
    }

    String shardConfigurationFileUrl = options.getSourceConfigURL();

    List<Shard> shards = new ShardFileReader(new SecretManagerAccessorImpl()).readShardingConfig(shardConfigurationFileUrl);
    LOG.info("Total shards read: {}", shards.size());
    ArrayList<PCollection<HashResult>> pCollections = new ArrayList<>();

    for(Shard shard: shards) {

      // https://stackoverflow.com/questions/68353660/zero-date-value-prohibited-hibernate-sql-jpa
      String zeroDateTimeNullBehaviorStr = options.getZeroDateTimeBehavior() ? "?zeroDateTimeBehavior=CONVERT_TO_NULL" : "";

      // JDBC conn string
      String connString = String.format("jdbc:%s://%s:%d/%s%s", options.getProtocol(),
          shard.getHost(),
          Integer.parseInt(shard.getPort()),
          shard.getDbName(),
          zeroDateTimeNullBehaviorStr);

      PCollection<HashResult> hashedJDBCRecordsPerShard = getJDBCRecordsHelper(
          tableName,
          pipelineTracker,
          query,
          keyIndex,
          rangeFieldType,
          options,
          pRanges,
          timestampThresholdIndex,
          customTransformation,
          schema,
          driver,
          connString,
          shard.getLogicalShardId(),
          shard.getUserName(),
          shard.getPassword());

      pCollections.add(hashedJDBCRecordsPerShard);
    } // for

    String flattenStepName = String.format("FlattenJDBCRecordsForTable-%s", tableName);
    PCollection<HashResult> mergedJdbcRecords =
        PCollectionList.of(pCollections).apply(flattenStepName, Flatten.pCollections());

    return mergedJdbcRecords;
  }