private void repairKeyspace()

in src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java [258:382]


    private void repairKeyspace(AutoRepairConfig.RepairType repairType, boolean primaryRangeOnly, String keyspaceName, List<RepairAssignment> repairAssignments, CollectedRepairStats collectedRepairStats)
    {
        AutoRepairConfig config = AutoRepairService.instance.getAutoRepairConfig();
        AutoRepairState repairState = repairStates.get(repairType);

        // evaluate over each keyspace's repair assignments.
        repairState.setRepairKeyspaceCount(repairState.getRepairKeyspaceCount() + 1);

        int totalRepairAssignments = repairAssignments.size();
        long keyspaceStartTime = timeFunc.get();
        RepairAssignment previousAssignment = null;
        long tableStartTime = timeFunc.get();
        int totalProcessedAssignments = 0;
        Set<Range<Token>> ranges = new HashSet<>();
        for (RepairAssignment curRepairAssignment : repairAssignments)
        {
            try
            {
                totalProcessedAssignments++;
                boolean repairOneTableAtATime = !config.getRepairByKeyspace(repairType);
                if (previousAssignment != null && repairOneTableAtATime && !previousAssignment.tableNames.equals(curRepairAssignment.tableNames))
                {
                    // In the repair assignment, all the tables are appended sequnetially.
                    // Check if we have a different table, and if so, we should reset the table start time.
                    tableStartTime = timeFunc.get();
                }
                previousAssignment = curRepairAssignment;
                if (!config.isAutoRepairEnabled(repairType))
                {
                    logger.error("Auto-repair for type {} is disabled hence not running repair", repairType);
                    repairState.setRepairInProgress(false);
                    return;
                }
                if (AutoRepairUtils.keyspaceMaxRepairTimeExceeded(repairType, keyspaceStartTime, repairAssignments.size()))
                {
                    collectedRepairStats.skippedTokenRanges += totalRepairAssignments - totalProcessedAssignments;
                    logger.info("Keyspace took too much time to repair hence skipping it {}",
                                keyspaceName);
                    break;
                }
                if (repairOneTableAtATime && AutoRepairUtils.tableMaxRepairTimeExceeded(repairType, tableStartTime))
                {
                    collectedRepairStats.skippedTokenRanges += 1;
                    logger.info("Table took too much time to repair hence skipping it table name {}.{}, token range {}",
                                keyspaceName, curRepairAssignment.tableNames, curRepairAssignment.tokenRange);
                    continue;
                }

                Range<Token> tokenRange = curRepairAssignment.getTokenRange();
                logger.debug("Current Token Left side {}, right side {}",
                             tokenRange.left.toString(),
                             tokenRange.right.toString());

                ranges.add(curRepairAssignment.getTokenRange());
                if ((totalProcessedAssignments % config.getRepairThreads(repairType) == 0) ||
                    (totalProcessedAssignments == totalRepairAssignments))
                {
                    boolean success = false;
                    int retryCount = 0;
                    Future<?> f = null;
                    while (retryCount <= config.getRepairMaxRetries(repairType))
                    {
                        RepairCoordinator task = repairState.getRepairRunnable(keyspaceName,
                                                                               Lists.newArrayList(curRepairAssignment.getTableNames()),
                                                                               ranges, primaryRangeOnly);
                        RepairProgressListener listener = new RepairProgressListener(repairType);
                        task.addProgressListener(listener);
                        f = repairRunnableExecutors.get(repairType).submit(task);
                        try
                        {
                            long jobStartTime = timeFunc.get();
                            listener.await(config.getRepairSessionTimeout(repairType));
                            success = listener.isSuccess();
                            soakAfterRepair(jobStartTime, config.getRepairTaskMinDuration().toMilliseconds());
                        }
                        catch (InterruptedException e)
                        {
                            logger.error("Exception in cond await:", e);
                        }
                        if (success)
                        {
                            break;
                        }
                        else if (retryCount < config.getRepairMaxRetries(repairType))
                        {
                            boolean cancellationStatus = f.cancel(true);
                            logger.warn("Repair failed for range {}-{} for {} tables {} with cancellationStatus: {} retrying after {} seconds...",
                                        tokenRange.left, tokenRange.right,
                                        keyspaceName, curRepairAssignment.getTableNames(),
                                        cancellationStatus, config.getRepairRetryBackoff(repairType).toSeconds());
                            sleepFunc.accept(config.getRepairRetryBackoff(repairType).toSeconds(), TimeUnit.SECONDS);
                        }
                        retryCount++;
                    }
                    //check repair status
                    if (success)
                    {
                        logger.info("Repair completed for range {}-{} for {} tables {}, total assignments: {}," +
                                    "processed assignments: {}", tokenRange.left, tokenRange.right,
                                    keyspaceName, curRepairAssignment.getTableNames(), totalRepairAssignments, totalProcessedAssignments);
                        collectedRepairStats.succeededTokenRanges += ranges.size();
                    }
                    else
                    {
                        boolean cancellationStatus = true;
                        if (f != null)
                        {
                            cancellationStatus = f.cancel(true);
                        }
                        //in the future we can add retry, etc.
                        logger.error("Repair failed for range {}-{} for {} tables {} after {} retries, total assignments: {}," +
                                     "processed assignments: {}, cancellationStatus: {}", tokenRange.left, tokenRange.right, keyspaceName,
                                     curRepairAssignment.getTableNames(), retryCount, totalRepairAssignments, totalProcessedAssignments, cancellationStatus);
                        collectedRepairStats.failedTokenRanges += ranges.size();
                    }
                    ranges.clear();
                }
                logger.info("Repair completed for {} tables {}, range {}", keyspaceName, curRepairAssignment.getTableNames(), curRepairAssignment.getTokenRange());
            }
            catch (Exception e)
            {
                logger.error("Exception while repairing keyspace {}:", keyspaceName, e);
            }
        }
    }