public RangeStats diffTable()

in spark-job/src/main/java/org/apache/cassandra/diff/Differ.java [208:236]


    public RangeStats diffTable(final DiffContext context,
                                final BiConsumer<Throwable, BigInteger> partitionErrorReporter,
                                final BiConsumer<MismatchType, BigInteger> mismatchReporter,
                                final BiConsumer<RangeStats, BigInteger> journal) {

        final Iterator<PartitionKey> sourceKeys = context.source.getPartitionKeys(context.table.getTable(),
                                                                                  context.startToken,
                                                                                  context.endToken);
        final Iterator<PartitionKey> targetKeys = context.target.getPartitionKeys(context.table.getTable(),
                                                                                  context.startToken,
                                                                                  context.endToken);
        final Function<PartitionKey, PartitionComparator> partitionTaskProvider =
            (key) -> {
                boolean reverse = context.shouldReverse();
                Iterator<Row> source = fetchRows(context, key, reverse, DiffCluster.Type.SOURCE);
                Iterator<Row> target = fetchRows(context, key, reverse, DiffCluster.Type.TARGET);
                return new PartitionComparator(context.table, source, target, retryStrategyProvider);
            };

        RangeComparator rangeComparator = new RangeComparator(context,
                                                              partitionErrorReporter,
                                                              mismatchReporter,
                                                              journal,
                                                              COMPARISON_EXECUTOR);
        final Predicate<PartitionKey> partitionSamplingFunction = shouldIncludePartition(jobId, partitionSamplingProbability);
        final RangeStats tableStats = rangeComparator.compare(sourceKeys, targetKeys, partitionTaskProvider, partitionSamplingFunction);
        logger.debug("Table [{}] stats - ({})", context.table.getTable(), tableStats);
        return tableStats;
    }