in src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java [258:382]
private void repairKeyspace(AutoRepairConfig.RepairType repairType, boolean primaryRangeOnly, String keyspaceName, List<RepairAssignment> repairAssignments, CollectedRepairStats collectedRepairStats)
{
AutoRepairConfig config = AutoRepairService.instance.getAutoRepairConfig();
AutoRepairState repairState = repairStates.get(repairType);
// evaluate over each keyspace's repair assignments.
repairState.setRepairKeyspaceCount(repairState.getRepairKeyspaceCount() + 1);
int totalRepairAssignments = repairAssignments.size();
long keyspaceStartTime = timeFunc.get();
RepairAssignment previousAssignment = null;
long tableStartTime = timeFunc.get();
int totalProcessedAssignments = 0;
Set<Range<Token>> ranges = new HashSet<>();
for (RepairAssignment curRepairAssignment : repairAssignments)
{
try
{
totalProcessedAssignments++;
boolean repairOneTableAtATime = !config.getRepairByKeyspace(repairType);
if (previousAssignment != null && repairOneTableAtATime && !previousAssignment.tableNames.equals(curRepairAssignment.tableNames))
{
// In the repair assignment, all the tables are appended sequnetially.
// Check if we have a different table, and if so, we should reset the table start time.
tableStartTime = timeFunc.get();
}
previousAssignment = curRepairAssignment;
if (!config.isAutoRepairEnabled(repairType))
{
logger.error("Auto-repair for type {} is disabled hence not running repair", repairType);
repairState.setRepairInProgress(false);
return;
}
if (AutoRepairUtils.keyspaceMaxRepairTimeExceeded(repairType, keyspaceStartTime, repairAssignments.size()))
{
collectedRepairStats.skippedTokenRanges += totalRepairAssignments - totalProcessedAssignments;
logger.info("Keyspace took too much time to repair hence skipping it {}",
keyspaceName);
break;
}
if (repairOneTableAtATime && AutoRepairUtils.tableMaxRepairTimeExceeded(repairType, tableStartTime))
{
collectedRepairStats.skippedTokenRanges += 1;
logger.info("Table took too much time to repair hence skipping it table name {}.{}, token range {}",
keyspaceName, curRepairAssignment.tableNames, curRepairAssignment.tokenRange);
continue;
}
Range<Token> tokenRange = curRepairAssignment.getTokenRange();
logger.debug("Current Token Left side {}, right side {}",
tokenRange.left.toString(),
tokenRange.right.toString());
ranges.add(curRepairAssignment.getTokenRange());
if ((totalProcessedAssignments % config.getRepairThreads(repairType) == 0) ||
(totalProcessedAssignments == totalRepairAssignments))
{
boolean success = false;
int retryCount = 0;
Future<?> f = null;
while (retryCount <= config.getRepairMaxRetries(repairType))
{
RepairCoordinator task = repairState.getRepairRunnable(keyspaceName,
Lists.newArrayList(curRepairAssignment.getTableNames()),
ranges, primaryRangeOnly);
RepairProgressListener listener = new RepairProgressListener(repairType);
task.addProgressListener(listener);
f = repairRunnableExecutors.get(repairType).submit(task);
try
{
long jobStartTime = timeFunc.get();
listener.await(config.getRepairSessionTimeout(repairType));
success = listener.isSuccess();
soakAfterRepair(jobStartTime, config.getRepairTaskMinDuration().toMilliseconds());
}
catch (InterruptedException e)
{
logger.error("Exception in cond await:", e);
}
if (success)
{
break;
}
else if (retryCount < config.getRepairMaxRetries(repairType))
{
boolean cancellationStatus = f.cancel(true);
logger.warn("Repair failed for range {}-{} for {} tables {} with cancellationStatus: {} retrying after {} seconds...",
tokenRange.left, tokenRange.right,
keyspaceName, curRepairAssignment.getTableNames(),
cancellationStatus, config.getRepairRetryBackoff(repairType).toSeconds());
sleepFunc.accept(config.getRepairRetryBackoff(repairType).toSeconds(), TimeUnit.SECONDS);
}
retryCount++;
}
//check repair status
if (success)
{
logger.info("Repair completed for range {}-{} for {} tables {}, total assignments: {}," +
"processed assignments: {}", tokenRange.left, tokenRange.right,
keyspaceName, curRepairAssignment.getTableNames(), totalRepairAssignments, totalProcessedAssignments);
collectedRepairStats.succeededTokenRanges += ranges.size();
}
else
{
boolean cancellationStatus = true;
if (f != null)
{
cancellationStatus = f.cancel(true);
}
//in the future we can add retry, etc.
logger.error("Repair failed for range {}-{} for {} tables {} after {} retries, total assignments: {}," +
"processed assignments: {}, cancellationStatus: {}", tokenRange.left, tokenRange.right, keyspaceName,
curRepairAssignment.getTableNames(), retryCount, totalRepairAssignments, totalProcessedAssignments, cancellationStatus);
collectedRepairStats.failedTokenRanges += ranges.size();
}
ranges.clear();
}
logger.info("Repair completed for {} tables {}, range {}", keyspaceName, curRepairAssignment.getTableNames(), curRepairAssignment.getTokenRange());
}
catch (Exception e)
{
logger.error("Exception while repairing keyspace {}:", keyspaceName, e);
}
}
}