in spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java [73:187]
public RangeStats compare(Iterator<PartitionKey> sourceKeys,
Iterator<PartitionKey> targetKeys,
Function<PartitionKey, PartitionComparator> partitionTaskProvider,
Predicate<PartitionKey> partitionSampler) {
final RangeStats rangeStats = RangeStats.newStats();
// We can catch this condition earlier, but it doesn't hurt to also check here
if (context.startToken.equals(context.endToken))
return rangeStats;
Phaser phaser = new Phaser(1);
AtomicLong partitionCount = new AtomicLong(0);
AtomicReference<BigInteger> highestTokenSeen = new AtomicReference<>(context.startToken);
logger.info("Comparing range [{},{}]", context.startToken, context.endToken);
try {
PartitionKey sourceKey = nextKey(sourceKeys);
PartitionKey targetKey = nextKey(targetKeys);
// special case for start of range - handles one cluster supplying an empty range
if ((sourceKey == null) != (targetKey == null)) {
if (sourceKey == null) {
logger.info("First in range, source iter is empty {}", context);
onlyInTarget(rangeStats, targetKey);
targetKeys.forEachRemaining(key -> onlyInTarget(rangeStats, key));
} else {
logger.info("First in range, target iter is empty {}", context);
onlyInSource(rangeStats, sourceKey);
sourceKeys.forEachRemaining(key -> onlyInSource(rangeStats, key));
}
return rangeStats;
}
while (sourceKey != null && targetKey != null) {
int ret = sourceKey.compareTo(targetKey);
if (ret > 0) {
onlyInTarget(rangeStats, targetKey);
targetKey = nextKey(targetKeys);
} else if (ret < 0) {
onlyInSource(rangeStats, sourceKey);
sourceKey = nextKey(sourceKeys);
} else {
Verify.verify(sourceKey.equals(targetKey),
"Can only compare partitions with identical keys: (%s, %s)",
sourceKey, targetKey);
// For results where the key exists in both, we'll fire off an async task to walk the
// partition and compare all the rows. The result of that comparison is added to the
// totals for the range and the highest seen token updated in the onSuccess callback
if (!context.isTokenAllowed(sourceKey.getTokenAsBigInteger())) {
logger.debug("Skipping disallowed token {}", sourceKey.getTokenAsBigInteger());
rangeStats.skipPartition();
sourceKey = nextKey(sourceKeys);
targetKey = nextKey(targetKeys);
continue;
}
BigInteger token = sourceKey.getTokenAsBigInteger();
try {
// Use probabilisticPartitionSampler for sampling partitions, skip partition
// if the sampler returns false otherwise run diff on that partition
if (partitionSampler.test(sourceKey)) {
PartitionComparator comparisonTask = partitionTaskProvider.apply(sourceKey);
comparisonExecutor.submit(comparisonTask,
onSuccess(rangeStats, partitionCount, token, highestTokenSeen, mismatchReporter, journal),
onError(rangeStats, token, errorReporter),
phaser);
}
} catch (Throwable t) {
// Handle errors thrown when creating the comparison task. This should trap timeouts and
// unavailables occurring when performing the initial query to read the full partition.
// Errors thrown when paging through the partition in comparisonTask will be handled by
// the onError callback.
recordError(rangeStats, token, errorReporter, t);
} finally {
// if the cluster has been shutdown because the task failed the underlying iterators
// of partition keys will return hasNext == false
sourceKey = nextKey(sourceKeys);
targetKey = nextKey(targetKeys);
}
}
}
// handle case where only one iterator is exhausted
if (sourceKey != null)
onlyInSource(rangeStats, sourceKey);
else if (targetKey != null)
onlyInTarget(rangeStats, targetKey);
drain(sourceKeys, targetKeys, rangeStats);
} catch (Exception e) {
// Handles errors thrown by iteration of underlying resultsets of partition keys by
// calls to nextKey(). Such errors should cause the overall range comparison to fail,
// but we must ensure that any in-flight partition comparisons complete so that either
// the onSuccess or onError callback is fired for each one. This is necessary to ensure
// that we record the highest seen token and any failed partitions and can safely re-run.
logger.debug("Waiting for {} in flight tasks before propagating error", phaser.getUnarrivedParties());
phaser.arriveAndAwaitAdvance();
throw new RuntimeException(String.format("Error encountered during range comparison for [%s:%s]",
context.startToken, context.endToken), e);
}
logger.debug("Waiting for {} in flight tasks before returning", phaser.getUnarrivedParties());
phaser.arriveAndAwaitAdvance();
if (!rangeStats.allMatches())
logger.info("Segment [{}:{}] stats - ({})", context.startToken, context.endToken, rangeStats);
return rangeStats;
}