in src/java/org/apache/cassandra/repair/RepairJob.java [150:294]
protected void runRepair()
{
List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.state.commonRange.endpoints);
allEndpoints.add(ctx.broadcastAddressAndPort());
TableMetadata metadata = cfs.metadata();
Future<Void> paxosRepair;
Epoch repairStartingEpoch = ClusterMetadata.current().epoch;
Preconditions.checkArgument(session.repairData || session.repairPaxos || session.repairAccord);
boolean doPaxosRepair = paxosRepairEnabled()
&& ((useV2() || isMetadataKeyspace()) && session.repairPaxos)
&& metadata.supportsPaxosOperations();
boolean doAccordRepair = metadata.requiresAccordSupport() && session.repairAccord;
if (doPaxosRepair)
{
logger.info("{} {}.{} starting paxos repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
paxosRepair = PaxosCleanup.cleanup(ctx, allEndpoints, metadata, desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor);
}
else
{
logger.info("{} {}.{} not running paxos repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
paxosRepair = ImmediateFuture.success(null);
}
Future<Ranges> accordRepair;
if (doAccordRepair)
{
accordRepair = paxosRepair.flatMap(unused -> {
boolean requireAllEndpoints;
// If the session excluded dead nodes it's not eligible for migration and is not supposed to occur at ALL anyways
if (session.excludedDeadNodes)
requireAllEndpoints = false;
else
{
// If the session is doing a data repair (which flushes sstables if not incremental) we can do the barriers at QUORUM
if (session.repairData && !session.isIncremental)
requireAllEndpoints = false;
else
requireAllEndpoints = true;
}
logger.info("{} {}.{} starting accord repair, require all endpoints {}", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily, requireAllEndpoints);
AccordRepair repair = new AccordRepair(ctx, cfs, desc.sessionId, desc.keyspace, desc.ranges, requireAllEndpoints, allEndpoints);
return repair.repair(taskExecutor);
}, taskExecutor);
}
else
{
accordRepair = paxosRepair.flatMap(unused -> {
logger.info("{} {}.{} not running accord repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
return ImmediateFuture.success(null);
});
}
Future<List<SyncStat>> syncResults;
if (session.repairData)
{
// Create a snapshot at all nodes unless we're using pure parallel repairs
final Future<?> allSnapshotTasks;
if (parallelismDegree != RepairParallelism.PARALLEL)
{
if (session.isIncremental)
{
// consistent repair does it's own "snapshotting"
allSnapshotTasks = accordRepair.map(input -> allEndpoints);
}
else
{
// Request snapshot to all replica
allSnapshotTasks = accordRepair.flatMap(input -> {
List<Future<InetAddressAndPort>> snapshotTasks = new ArrayList<>(allEndpoints.size());
state.phase.snapshotsSubmitted();
for (InetAddressAndPort endpoint : allEndpoints)
{
SnapshotTask snapshotTask = new SnapshotTask(ctx, desc, endpoint);
snapshotTasks.add(snapshotTask);
taskExecutor.execute(snapshotTask);
}
return FutureCombiner.allOf(snapshotTasks).map(a -> {
state.phase.snapshotsCompleted();
return a;
});
});
}
}
else
{
allSnapshotTasks = null;
}
// Run validations and the creation of sync tasks in the scheduler, so it can limit the number of Merkle trees
// that there are in memory at once. When all validations complete, submit sync tasks out of the scheduler.
syncResults = session.validationScheduler.schedule(() -> createSyncTasks(accordRepair, allSnapshotTasks, allEndpoints), taskExecutor)
.flatMap(this::executeTasks, taskExecutor);
}
else
{
syncResults = accordRepair.flatMap(unused -> {
logger.info("{} {}.{} not running data repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
return ImmediateFuture.success(Collections.emptyList());
});
}
// When all sync complete, set the final result
syncResults.addCallback(new FutureCallback<>()
{
@Override
public void onSuccess(List<SyncStat> stats)
{
logger.info("{} {}.{} Successfully did repair repairData {}, repairPaxos {}, repairAccord {}, excludedDeadNodes {}", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily, session.repairData, session.repairPaxos, session.repairAccord, session.excludedDeadNodes);
state.phase.success();
if (!session.previewKind.isPreview() && session.repairData)
{
logger.info("{} {}.{} is fully synced", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
SystemDistributedKeyspace.successfulRepairJob(session.getId(), desc.keyspace, desc.columnFamily);
}
cfs.metric.repairsCompleted.inc();
logger.info("Completing repair with excludedDeadNodes {}", session.excludedDeadNodes);
ConsensusMigrationRepairResult cmrs = ConsensusMigrationRepairResult.fromRepair(repairStartingEpoch, getUnchecked(accordRepair), session.repairData, doPaxosRepair, doAccordRepair, session.excludedDeadNodes);
trySuccess(new RepairResult(desc, stats, cmrs));
}
/**
* Snapshot, validation and sync failures are all handled here
*/
@Override
public void onFailure(Throwable t)
{
logger.info("{} {}.{} Failed repair repairData {}, repairPaxos {}, repairAccord {}, excludedDeadNodes {}", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily, session.repairData, session.repairPaxos, session.repairAccord, session.excludedDeadNodes);
state.phase.fail(t);
abort(t);
if (!session.previewKind.isPreview() && session.repairData)
{
logger.warn("{} {}.{} sync failed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
SystemDistributedKeyspace.failedRepairJob(session.getId(), desc.keyspace, desc.columnFamily, t);
}
cfs.metric.repairsCompleted.inc();
tryFailure(t instanceof NoSuchRepairSessionExceptionWrapper
? ((NoSuchRepairSessionExceptionWrapper) t).wrapped
: t);
}
}, taskExecutor);
}