in spark-job/src/main/java/org/apache/cassandra/diff/Differ.java [208:236]
public RangeStats diffTable(final DiffContext context,
final BiConsumer<Throwable, BigInteger> partitionErrorReporter,
final BiConsumer<MismatchType, BigInteger> mismatchReporter,
final BiConsumer<RangeStats, BigInteger> journal) {
final Iterator<PartitionKey> sourceKeys = context.source.getPartitionKeys(context.table.getTable(),
context.startToken,
context.endToken);
final Iterator<PartitionKey> targetKeys = context.target.getPartitionKeys(context.table.getTable(),
context.startToken,
context.endToken);
final Function<PartitionKey, PartitionComparator> partitionTaskProvider =
(key) -> {
boolean reverse = context.shouldReverse();
Iterator<Row> source = fetchRows(context, key, reverse, DiffCluster.Type.SOURCE);
Iterator<Row> target = fetchRows(context, key, reverse, DiffCluster.Type.TARGET);
return new PartitionComparator(context.table, source, target, retryStrategyProvider);
};
RangeComparator rangeComparator = new RangeComparator(context,
partitionErrorReporter,
mismatchReporter,
journal,
COMPARISON_EXECUTOR);
final Predicate<PartitionKey> partitionSamplingFunction = shouldIncludePartition(jobId, partitionSamplingProbability);
final RangeStats tableStats = rangeComparator.compare(sourceKeys, targetKeys, partitionTaskProvider, partitionSamplingFunction);
logger.debug("Table [{}] stats - ({})", context.table.getTable(), tableStats);
return tableStats;
}