public RangeStats compare()

in spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java [73:187]


    public RangeStats compare(Iterator<PartitionKey> sourceKeys,
                              Iterator<PartitionKey> targetKeys,
                              Function<PartitionKey, PartitionComparator> partitionTaskProvider,
                              Predicate<PartitionKey> partitionSampler) {

        final RangeStats rangeStats = RangeStats.newStats();
        // We can catch this condition earlier, but it doesn't hurt to also check here
        if (context.startToken.equals(context.endToken))
            return rangeStats;

        Phaser phaser = new Phaser(1);
        AtomicLong partitionCount = new AtomicLong(0);
        AtomicReference<BigInteger> highestTokenSeen = new AtomicReference<>(context.startToken);

        logger.info("Comparing range [{},{}]", context.startToken, context.endToken);
        try {
            PartitionKey sourceKey = nextKey(sourceKeys);
            PartitionKey targetKey = nextKey(targetKeys);

            // special case for start of range - handles one cluster supplying an empty range
            if ((sourceKey == null) != (targetKey == null)) {
                if (sourceKey == null) {
                    logger.info("First in range, source iter is empty {}", context);
                    onlyInTarget(rangeStats, targetKey);
                    targetKeys.forEachRemaining(key -> onlyInTarget(rangeStats, key));
                } else {
                    logger.info("First in range, target iter is empty {}", context);
                    onlyInSource(rangeStats, sourceKey);
                    sourceKeys.forEachRemaining(key -> onlyInSource(rangeStats, key));
                }
                return rangeStats;
            }

            while (sourceKey != null && targetKey != null) {

                int ret = sourceKey.compareTo(targetKey);
                if (ret > 0) {
                    onlyInTarget(rangeStats, targetKey);
                    targetKey = nextKey(targetKeys);
                } else if (ret < 0) {
                    onlyInSource(rangeStats, sourceKey);
                    sourceKey = nextKey(sourceKeys);
                } else {

                    Verify.verify(sourceKey.equals(targetKey),
                                  "Can only compare partitions with identical keys: (%s, %s)",
                                  sourceKey, targetKey);

                    // For results where the key exists in both, we'll fire off an async task to walk the
                    // partition and compare all the rows. The result of that comparison is added to the
                    // totals for the range and the highest seen token updated in the onSuccess callback

                    if (!context.isTokenAllowed(sourceKey.getTokenAsBigInteger())) {
                        logger.debug("Skipping disallowed token {}", sourceKey.getTokenAsBigInteger());
                        rangeStats.skipPartition();
                        sourceKey = nextKey(sourceKeys);
                        targetKey = nextKey(targetKeys);
                        continue;
                    }

                    BigInteger token = sourceKey.getTokenAsBigInteger();
                    try {
                        // Use probabilisticPartitionSampler for sampling partitions, skip partition
                        // if the sampler returns false otherwise run diff on that partition
                        if (partitionSampler.test(sourceKey)) {
                            PartitionComparator comparisonTask = partitionTaskProvider.apply(sourceKey);
                            comparisonExecutor.submit(comparisonTask,
                                                      onSuccess(rangeStats, partitionCount, token, highestTokenSeen, mismatchReporter, journal),
                                                      onError(rangeStats, token, errorReporter),
                                                      phaser);
                        }

                    } catch (Throwable t) {
                        // Handle errors thrown when creating the comparison task. This should trap timeouts and
                        // unavailables occurring when performing the initial query to read the full partition.
                        // Errors thrown when paging through the partition in comparisonTask will be handled by
                        // the onError callback.
                        recordError(rangeStats, token, errorReporter, t);
                    } finally {
                        // if the cluster has been shutdown because the task failed the underlying iterators
                        // of partition keys will return hasNext == false
                        sourceKey = nextKey(sourceKeys);
                        targetKey = nextKey(targetKeys);
                    }
                }
            }

            // handle case where only one iterator is exhausted
            if (sourceKey != null)
                onlyInSource(rangeStats, sourceKey);
            else if (targetKey != null)
                onlyInTarget(rangeStats, targetKey);

            drain(sourceKeys, targetKeys, rangeStats);

        } catch (Exception e) {
            // Handles errors thrown by iteration of underlying resultsets of partition keys by
            // calls to nextKey(). Such errors should cause the overall range comparison to fail,
            // but we must ensure that any in-flight partition comparisons complete so that either
            // the onSuccess or onError callback is fired for each one. This is necessary to ensure
            // that we record the highest seen token and any failed partitions and can safely re-run.
            logger.debug("Waiting for {} in flight tasks before propagating error", phaser.getUnarrivedParties());
            phaser.arriveAndAwaitAdvance();
            throw new RuntimeException(String.format("Error encountered during range comparison for [%s:%s]",
                                       context.startToken, context.endToken), e);
        }

        logger.debug("Waiting for {} in flight tasks before returning", phaser.getUnarrivedParties());
        phaser.arriveAndAwaitAdvance();

        if (!rangeStats.allMatches())
            logger.info("Segment [{}:{}] stats - ({})", context.startToken, context.endToken, rangeStats);

        return rangeStats;
    }