protected void runRepair()

in src/java/org/apache/cassandra/repair/RepairJob.java [150:294]


    protected void runRepair()
    {
        List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.state.commonRange.endpoints);
        allEndpoints.add(ctx.broadcastAddressAndPort());

        TableMetadata metadata = cfs.metadata();
        Future<Void> paxosRepair;
        Epoch repairStartingEpoch = ClusterMetadata.current().epoch;

        Preconditions.checkArgument(session.repairData || session.repairPaxos || session.repairAccord);
        boolean doPaxosRepair = paxosRepairEnabled()
                                && ((useV2() || isMetadataKeyspace()) && session.repairPaxos)
                                && metadata.supportsPaxosOperations();
        boolean doAccordRepair = metadata.requiresAccordSupport() && session.repairAccord;

        if (doPaxosRepair)
        {
            logger.info("{} {}.{} starting paxos repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
            paxosRepair = PaxosCleanup.cleanup(ctx, allEndpoints, metadata, desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor);
        }
        else
        {
            logger.info("{} {}.{} not running paxos repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
            paxosRepair = ImmediateFuture.success(null);
        }

        Future<Ranges> accordRepair;
        if (doAccordRepair)
        {
            accordRepair = paxosRepair.flatMap(unused -> {
                boolean requireAllEndpoints;
                // If the session excluded dead nodes it's not eligible for migration and is not supposed to occur at ALL anyways
                if (session.excludedDeadNodes)
                    requireAllEndpoints = false;
                else
                {
                    // If the session is doing a data repair (which flushes sstables if not incremental) we can do the barriers at QUORUM
                    if (session.repairData && !session.isIncremental)
                        requireAllEndpoints = false;
                    else
                        requireAllEndpoints = true;
                }
                logger.info("{} {}.{} starting accord repair, require all endpoints {}", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily, requireAllEndpoints);
                AccordRepair repair = new AccordRepair(ctx, cfs, desc.sessionId, desc.keyspace, desc.ranges, requireAllEndpoints, allEndpoints);
                return repair.repair(taskExecutor);
            }, taskExecutor);
        }
        else
        {
            accordRepair = paxosRepair.flatMap(unused -> {
                logger.info("{} {}.{} not running accord repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
                return ImmediateFuture.success(null);
            });
        }

        Future<List<SyncStat>> syncResults;
        if (session.repairData)
        {
            // Create a snapshot at all nodes unless we're using pure parallel repairs
            final Future<?> allSnapshotTasks;
            if (parallelismDegree != RepairParallelism.PARALLEL)
            {
                if (session.isIncremental)
                {
                    // consistent repair does it's own "snapshotting"
                    allSnapshotTasks = accordRepair.map(input -> allEndpoints);
                }
                else
                {
                    // Request snapshot to all replica
                    allSnapshotTasks = accordRepair.flatMap(input -> {
                        List<Future<InetAddressAndPort>> snapshotTasks = new ArrayList<>(allEndpoints.size());
                        state.phase.snapshotsSubmitted();
                        for (InetAddressAndPort endpoint : allEndpoints)
                        {
                            SnapshotTask snapshotTask = new SnapshotTask(ctx, desc, endpoint);
                            snapshotTasks.add(snapshotTask);
                            taskExecutor.execute(snapshotTask);
                        }
                        return FutureCombiner.allOf(snapshotTasks).map(a -> {
                            state.phase.snapshotsCompleted();
                            return a;
                        });
                    });
                }
            }
            else
            {
                allSnapshotTasks = null;
            }

            // Run validations and the creation of sync tasks in the scheduler, so it can limit the number of Merkle trees
            // that there are in memory at once. When all validations complete, submit sync tasks out of the scheduler.
            syncResults = session.validationScheduler.schedule(() -> createSyncTasks(accordRepair, allSnapshotTasks, allEndpoints), taskExecutor)
                                                                            .flatMap(this::executeTasks, taskExecutor);
        }
        else
        {
            syncResults = accordRepair.flatMap(unused -> {
                logger.info("{} {}.{} not running data repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
                return ImmediateFuture.success(Collections.emptyList());
            });
        }

        // When all sync complete, set the final result
        syncResults.addCallback(new FutureCallback<>()
        {
            @Override
            public void onSuccess(List<SyncStat> stats)
            {
                logger.info("{} {}.{} Successfully did repair repairData {}, repairPaxos {}, repairAccord {}, excludedDeadNodes {}", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily, session.repairData, session.repairPaxos, session.repairAccord, session.excludedDeadNodes);
                state.phase.success();
                if (!session.previewKind.isPreview() && session.repairData)
                {
                    logger.info("{} {}.{} is fully synced", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
                    SystemDistributedKeyspace.successfulRepairJob(session.getId(), desc.keyspace, desc.columnFamily);
                }
                cfs.metric.repairsCompleted.inc();
                logger.info("Completing repair with excludedDeadNodes {}", session.excludedDeadNodes);
                ConsensusMigrationRepairResult cmrs = ConsensusMigrationRepairResult.fromRepair(repairStartingEpoch, getUnchecked(accordRepair), session.repairData, doPaxosRepair, doAccordRepair, session.excludedDeadNodes);
                trySuccess(new RepairResult(desc, stats, cmrs));
            }

            /**
             * Snapshot, validation and sync failures are all handled here
             */
            @Override
            public void onFailure(Throwable t)
            {
                logger.info("{} {}.{} Failed repair repairData {}, repairPaxos {}, repairAccord {}, excludedDeadNodes {}", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily, session.repairData, session.repairPaxos, session.repairAccord, session.excludedDeadNodes);
                state.phase.fail(t);
                abort(t);

                if (!session.previewKind.isPreview() && session.repairData)
                {
                    logger.warn("{} {}.{} sync failed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
                    SystemDistributedKeyspace.failedRepairJob(session.getId(), desc.keyspace, desc.columnFamily, t);
                }
                cfs.metric.repairsCompleted.inc();
                tryFailure(t instanceof NoSuchRepairSessionExceptionWrapper
                           ? ((NoSuchRepairSessionExceptionWrapper) t).wrapped
                           : t);
            }
        }, taskExecutor);
    }