in spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java [215:243]
private Consumer<PartitionStats> onSuccess(final RangeStats rangeStats,
final AtomicLong partitionCount,
final BigInteger currentToken,
final AtomicReference<BigInteger> highestSeenToken,
final BiConsumer<MismatchType, BigInteger> mismatchReporter,
final BiConsumer<RangeStats, BigInteger> journal) {
return (result) -> {
rangeStats.accumulate(result);
if (!result.allClusteringsMatch || result.mismatchedValues > 0) {
mismatchReporter.accept(MismatchType.PARTITION_MISMATCH, currentToken);
rangeStats.mismatchedPartition();
} else {
rangeStats.matchedPartition();
}
BigInteger highest = highestSeenToken.get();
while (currentToken.compareTo(highest) > 0) {
if (highestSeenToken.compareAndSet(highest, currentToken))
break;
highest = highestSeenToken.get();
}
// checkpoint ever 10 partitions
if (partitionCount.incrementAndGet() % 10 == 0)
journal.accept(rangeStats, highestSeenToken.get());
};
}