public Map run()

in spark-job/src/main/java/org/apache/cassandra/diff/Differ.java [150:206]


    public Map<KeyspaceTablePair, RangeStats> run() {
        JobMetadataDb.ProgressTracker journal = trackerProvider.getTracker(journalSession, jobId, split);

        Map<KeyspaceTablePair, DiffJob.TaskStatus> tablesToDiff = filterTables(keyspaceTables,
                                                                               split,
                                                                               journal::getLastStatus,
                                                                               !specificTokens.isEmpty());

        String metricsPrefix = srcDiffCluster.clusterId.name();
        logger.info("Diffing {} for tables {}", split, tablesToDiff);

        for (Map.Entry<KeyspaceTablePair, DiffJob.TaskStatus> tableStatus : tablesToDiff.entrySet()) {
            final KeyspaceTablePair keyspaceTablePair = tableStatus.getKey();
            DiffJob.TaskStatus status = tableStatus.getValue();
            RangeStats diffStats = status.stats;

            // if this split has already been fully processed, it's being re-run to check
            // partitions with errors. In that case, we don't want to adjust the split
            // start and we don't want to update the completed count when we're finished.
            boolean isRerun = split.end.equals(status.lastToken);
            BigInteger startToken = status.lastToken == null || isRerun ? split.start : status.lastToken;
            validateRange(startToken, split.end, tokenHelper);

            TableSpec sourceTable = TableSpec.make(keyspaceTablePair, srcDiffCluster);
            TableSpec targetTable = TableSpec.make(keyspaceTablePair, targetDiffCluster);
            validateTableSpecs(sourceTable, targetTable);

            DiffContext ctx = new DiffContext(srcDiffCluster,
                                              targetDiffCluster,
                                              keyspaceTablePair.keyspace,
                                              sourceTable,
                                              startToken,
                                              split.end,
                                              specificTokens,
                                              reverseReadProbability);

            String timerName = String.format("%s.%s.split_times", metricsPrefix, keyspaceTablePair.table);
            try (@SuppressWarnings("unused") Timer.Context timer = metrics.timer(timerName).time()) {
                diffStats.accumulate(diffTable(ctx,
                                               (error, token) -> journal.recordError(keyspaceTablePair, token, error),
                                               (type, token) -> journal.recordMismatch(keyspaceTablePair, type, token),
                                               (stats, token) -> journal.updateStatus(keyspaceTablePair, stats, token)));

                // update the journal with the final state for the table. Use the split's ending token
                // as the last seen token (even though we may not have actually read any partition for
                // that token) as this effectively marks the split as done.
                journal.finishTable(keyspaceTablePair, diffStats, !isRerun);
            }
        }

        Map<KeyspaceTablePair, RangeStats> statsByTable = tablesToDiff.entrySet()
                                                                      .stream()
                                                                      .collect(Collectors.toMap(Map.Entry::getKey,
                                                                                                e -> e.getValue().stats));
        updateMetrics(metricsPrefix, statsByTable);
        return statsByTable;
    }