in spark-job/src/main/java/org/apache/cassandra/diff/Differ.java [150:206]
public Map<KeyspaceTablePair, RangeStats> run() {
JobMetadataDb.ProgressTracker journal = trackerProvider.getTracker(journalSession, jobId, split);
Map<KeyspaceTablePair, DiffJob.TaskStatus> tablesToDiff = filterTables(keyspaceTables,
split,
journal::getLastStatus,
!specificTokens.isEmpty());
String metricsPrefix = srcDiffCluster.clusterId.name();
logger.info("Diffing {} for tables {}", split, tablesToDiff);
for (Map.Entry<KeyspaceTablePair, DiffJob.TaskStatus> tableStatus : tablesToDiff.entrySet()) {
final KeyspaceTablePair keyspaceTablePair = tableStatus.getKey();
DiffJob.TaskStatus status = tableStatus.getValue();
RangeStats diffStats = status.stats;
// if this split has already been fully processed, it's being re-run to check
// partitions with errors. In that case, we don't want to adjust the split
// start and we don't want to update the completed count when we're finished.
boolean isRerun = split.end.equals(status.lastToken);
BigInteger startToken = status.lastToken == null || isRerun ? split.start : status.lastToken;
validateRange(startToken, split.end, tokenHelper);
TableSpec sourceTable = TableSpec.make(keyspaceTablePair, srcDiffCluster);
TableSpec targetTable = TableSpec.make(keyspaceTablePair, targetDiffCluster);
validateTableSpecs(sourceTable, targetTable);
DiffContext ctx = new DiffContext(srcDiffCluster,
targetDiffCluster,
keyspaceTablePair.keyspace,
sourceTable,
startToken,
split.end,
specificTokens,
reverseReadProbability);
String timerName = String.format("%s.%s.split_times", metricsPrefix, keyspaceTablePair.table);
try (@SuppressWarnings("unused") Timer.Context timer = metrics.timer(timerName).time()) {
diffStats.accumulate(diffTable(ctx,
(error, token) -> journal.recordError(keyspaceTablePair, token, error),
(type, token) -> journal.recordMismatch(keyspaceTablePair, type, token),
(stats, token) -> journal.updateStatus(keyspaceTablePair, stats, token)));
// update the journal with the final state for the table. Use the split's ending token
// as the last seen token (even though we may not have actually read any partition for
// that token) as this effectively marks the split as done.
journal.finishTable(keyspaceTablePair, diffStats, !isRerun);
}
}
Map<KeyspaceTablePair, RangeStats> statsByTable = tablesToDiff.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> e.getValue().stats));
updateMetrics(metricsPrefix, statsByTable);
return statsByTable;
}