public static RepairTurn myTurnToRunRepair()

in src/java/org/apache/cassandra/repair/autorepair/AutoRepairUtils.java [766:948]


    public static RepairTurn myTurnToRunRepair(RepairType repairType, UUID myId)
    {
        try
        {
            Collection<NodeAddresses> allNodesInRing = ClusterMetadata.current().directory.addresses.values();
            logger.info("Total nodes in ring {}", allNodesInRing.size());
            TreeSet<UUID> hostIdsInCurrentRing = getHostIdsInCurrentRing(repairType, allNodesInRing);
            logger.info("Total nodes qualified for repair {}", hostIdsInCurrentRing.size());

            List<AutoRepairHistory> autoRepairHistories = getAutoRepairHistory(repairType);
            Set<UUID> autoRepairHistoryIds = new HashSet<>();

            // 1. Remove any node that is not part of group based on gossip info
            if (autoRepairHistories != null)
            {
                for (AutoRepairHistory nodeHistory : autoRepairHistories)
                {
                    autoRepairHistoryIds.add(nodeHistory.hostId);
                    // clear delete_hosts if the node's delete hosts is not growing for more than two hours
                    AutoRepairConfig config = AutoRepairService.instance.getAutoRepairConfig();
                    if (!nodeHistory.deleteHosts.isEmpty()
                        && config.getAutoRepairHistoryClearDeleteHostsBufferInterval().toSeconds() < TimeUnit.MILLISECONDS.toSeconds(
                    currentTimeMillis() - nodeHistory.deleteHostsUpdateTime
                    ))
                    {
                        clearDeleteHosts(repairType, nodeHistory.hostId);
                        logger.info("Delete hosts for {} for repair type {} has not been updated for more than {} seconds. Delete hosts has been cleared. Delete hosts before clear {}"
                        , nodeHistory.hostId, repairType, config.getAutoRepairHistoryClearDeleteHostsBufferInterval(), nodeHistory.deleteHosts);
                    }
                    else if (!hostIdsInCurrentRing.contains(nodeHistory.hostId))
                    {
                        if (nodeHistory.deleteHosts.size() > Math.max(2, hostIdsInCurrentRing.size() * 0.5))
                        {
                            // More than half of the groups thinks the record should be deleted
                            logger.info("{} think {} is orphan node, will delete auto repair history for repair type {}.", nodeHistory.deleteHosts, nodeHistory.hostId, repairType);
                            deleteAutoRepairHistory(repairType, nodeHistory.hostId);
                        }
                        else
                        {
                            // I think this host should be deleted
                            logger.info("I({}) think {} is not part of ring, vote to delete it for repair type {}.", myId, nodeHistory.hostId, repairType);
                            addHostIdToDeleteHosts(repairType, myId, nodeHistory.hostId);
                        }
                    }
                }
            }

            // 2. Add node to auto repair history table if a node is in gossip info
            for (UUID hostId : hostIdsInCurrentRing)
            {
                if (!autoRepairHistoryIds.contains(hostId))
                {
                    logger.info("{} for repair type {} doesn't exist in the auto repair history table, insert a new record.", repairType, hostId);
                    insertNewRepairHistory(repairType, hostId, currentTimeMillis(), currentTimeMillis());
                }
            }

            // get updated current repair status
            CurrentRepairStatus currentRepairStatus = getCurrentRepairStatus(repairType, getAutoRepairHistory(repairType), myId);
            if (currentRepairStatus != null)
            {
                if (logger.isDebugEnabled())
                {
                    logger.debug("Latest repair status {}", currentRepairStatus);
                }
                //check if I am forced to run repair
                for (AutoRepairHistory history : currentRepairStatus.historiesWithoutOnGoingRepair)
                {
                    if (history.forceRepair && history.hostId.equals(myId))
                    {
                        return MY_TURN_FORCE_REPAIR;
                    }
                }
            }

            // check if node was already indicated as having an ongoing repair, this may happen when a node restarts
            // before finishing repairing.
            if (currentRepairStatus != null && currentRepairStatus.getAllHostsWithOngoingRepair().contains(myId))
            {
                logAlreadyMyTurn();

                // use the previously chosen turn.
                if (currentRepairStatus.myRepairHistory != null && currentRepairStatus.myRepairHistory.repairTurn != null)
                {
                    return RepairTurn.valueOf(currentRepairStatus.myRepairHistory.repairTurn);
                }
                else
                {
                    return MY_TURN;
                }
            }

            int parallelRepairNumber = getMaxNumberOfNodeRunAutoRepair(repairType,
                                                                              autoRepairHistories == null ? 0 : autoRepairHistories.size());
            logger.info("Will run repairs concurrently on {} node(s)", parallelRepairNumber);
            if (currentRepairStatus == null || parallelRepairNumber > currentRepairStatus.hostIdsWithOnGoingRepair.size())
            {
                // more repairs can be run, I might be the new one
                if (autoRepairHistories != null)
                {
                    logger.info("Auto repair history table has {} records", autoRepairHistories.size());
                }
                else
                {
                    // try to fetch again
                    autoRepairHistories = getAutoRepairHistory(repairType);
                    if (autoRepairHistories == null)
                    {
                        logger.error("No record found");
                        return NOT_MY_TURN;
                    }

                    currentRepairStatus = getCurrentRepairStatus(repairType, autoRepairHistories, myId);
                }

                UUID priorityHostId = null;
                if (currentRepairStatus.priority != null)
                {
                    for (UUID priorityID : currentRepairStatus.priority)
                    {
                        // remove ids doesn't belong to this ring
                        if (!hostIdsInCurrentRing.contains(priorityID))
                        {
                            logger.info("{} is not part of the current ring, will be removed from priority list.", priorityID);
                            removePriorityStatus(repairType, priorityID);
                        }
                        else
                        {
                            priorityHostId = priorityID;
                            break;
                        }
                    }
                }

                if (priorityHostId != null && !myId.equals(priorityHostId))
                {
                    logger.info("Priority list is not empty and I'm not the first node in the list, not my turn." +
                                "First node in priority list is {}", getBroadcastAddress(priorityHostId));
                    return NOT_MY_TURN;
                }

                if (myId.equals(priorityHostId))
                {
                    //I have a priority for repair hence its my turn now
                    return MY_TURN_DUE_TO_PRIORITY;
                }

                // Determine if this node is the most eligible host to repair.
                AutoRepairHistory nodeToBeRepaired = getMostEligibleHostToRepair(repairType, currentRepairStatus, myId);
                if (nodeToBeRepaired != null)
                {
                    if (nodeToBeRepaired.hostId.equals(myId))
                    {
                        logger.info("This node is selected to be repaired for repair type {}", repairType);
                        return MY_TURN;
                    }

                    // log which node is next, which is helpful for debugging
                    logger.info("Next node to be repaired for repair type {}: {} ({})", repairType,
                                 getBroadcastAddress(nodeToBeRepaired.hostId),
                                 nodeToBeRepaired);
                }

                // If this node is not identified as most eligible, set the repair lag time.
                if (currentRepairStatus.myRepairHistory != null)
                {
                    AutoRepairMetricsManager.getMetrics(repairType)
                                            .recordRepairStartLag(currentRepairStatus.myRepairHistory.lastRepairFinishTime);
                }
            }
            else if (currentRepairStatus.hostIdsWithOnGoingForceRepair.contains(myId))
            {
                return MY_TURN_FORCE_REPAIR;
            }
            // for some reason I was not done with the repair hence resume (maybe node restart in-between, etc.)
            return currentRepairStatus.hostIdsWithOnGoingRepair.contains(myId) ? MY_TURN : NOT_MY_TURN;
        }
        catch (Exception e)
        {
            logger.error("Exception while deciding node's turn:", e);
        }
        return NOT_MY_TURN;
    }