in src/java/org/apache/cassandra/repair/autorepair/AutoRepairUtils.java [766:948]
public static RepairTurn myTurnToRunRepair(RepairType repairType, UUID myId)
{
try
{
Collection<NodeAddresses> allNodesInRing = ClusterMetadata.current().directory.addresses.values();
logger.info("Total nodes in ring {}", allNodesInRing.size());
TreeSet<UUID> hostIdsInCurrentRing = getHostIdsInCurrentRing(repairType, allNodesInRing);
logger.info("Total nodes qualified for repair {}", hostIdsInCurrentRing.size());
List<AutoRepairHistory> autoRepairHistories = getAutoRepairHistory(repairType);
Set<UUID> autoRepairHistoryIds = new HashSet<>();
// 1. Remove any node that is not part of group based on gossip info
if (autoRepairHistories != null)
{
for (AutoRepairHistory nodeHistory : autoRepairHistories)
{
autoRepairHistoryIds.add(nodeHistory.hostId);
// clear delete_hosts if the node's delete hosts is not growing for more than two hours
AutoRepairConfig config = AutoRepairService.instance.getAutoRepairConfig();
if (!nodeHistory.deleteHosts.isEmpty()
&& config.getAutoRepairHistoryClearDeleteHostsBufferInterval().toSeconds() < TimeUnit.MILLISECONDS.toSeconds(
currentTimeMillis() - nodeHistory.deleteHostsUpdateTime
))
{
clearDeleteHosts(repairType, nodeHistory.hostId);
logger.info("Delete hosts for {} for repair type {} has not been updated for more than {} seconds. Delete hosts has been cleared. Delete hosts before clear {}"
, nodeHistory.hostId, repairType, config.getAutoRepairHistoryClearDeleteHostsBufferInterval(), nodeHistory.deleteHosts);
}
else if (!hostIdsInCurrentRing.contains(nodeHistory.hostId))
{
if (nodeHistory.deleteHosts.size() > Math.max(2, hostIdsInCurrentRing.size() * 0.5))
{
// More than half of the groups thinks the record should be deleted
logger.info("{} think {} is orphan node, will delete auto repair history for repair type {}.", nodeHistory.deleteHosts, nodeHistory.hostId, repairType);
deleteAutoRepairHistory(repairType, nodeHistory.hostId);
}
else
{
// I think this host should be deleted
logger.info("I({}) think {} is not part of ring, vote to delete it for repair type {}.", myId, nodeHistory.hostId, repairType);
addHostIdToDeleteHosts(repairType, myId, nodeHistory.hostId);
}
}
}
}
// 2. Add node to auto repair history table if a node is in gossip info
for (UUID hostId : hostIdsInCurrentRing)
{
if (!autoRepairHistoryIds.contains(hostId))
{
logger.info("{} for repair type {} doesn't exist in the auto repair history table, insert a new record.", repairType, hostId);
insertNewRepairHistory(repairType, hostId, currentTimeMillis(), currentTimeMillis());
}
}
// get updated current repair status
CurrentRepairStatus currentRepairStatus = getCurrentRepairStatus(repairType, getAutoRepairHistory(repairType), myId);
if (currentRepairStatus != null)
{
if (logger.isDebugEnabled())
{
logger.debug("Latest repair status {}", currentRepairStatus);
}
//check if I am forced to run repair
for (AutoRepairHistory history : currentRepairStatus.historiesWithoutOnGoingRepair)
{
if (history.forceRepair && history.hostId.equals(myId))
{
return MY_TURN_FORCE_REPAIR;
}
}
}
// check if node was already indicated as having an ongoing repair, this may happen when a node restarts
// before finishing repairing.
if (currentRepairStatus != null && currentRepairStatus.getAllHostsWithOngoingRepair().contains(myId))
{
logAlreadyMyTurn();
// use the previously chosen turn.
if (currentRepairStatus.myRepairHistory != null && currentRepairStatus.myRepairHistory.repairTurn != null)
{
return RepairTurn.valueOf(currentRepairStatus.myRepairHistory.repairTurn);
}
else
{
return MY_TURN;
}
}
int parallelRepairNumber = getMaxNumberOfNodeRunAutoRepair(repairType,
autoRepairHistories == null ? 0 : autoRepairHistories.size());
logger.info("Will run repairs concurrently on {} node(s)", parallelRepairNumber);
if (currentRepairStatus == null || parallelRepairNumber > currentRepairStatus.hostIdsWithOnGoingRepair.size())
{
// more repairs can be run, I might be the new one
if (autoRepairHistories != null)
{
logger.info("Auto repair history table has {} records", autoRepairHistories.size());
}
else
{
// try to fetch again
autoRepairHistories = getAutoRepairHistory(repairType);
if (autoRepairHistories == null)
{
logger.error("No record found");
return NOT_MY_TURN;
}
currentRepairStatus = getCurrentRepairStatus(repairType, autoRepairHistories, myId);
}
UUID priorityHostId = null;
if (currentRepairStatus.priority != null)
{
for (UUID priorityID : currentRepairStatus.priority)
{
// remove ids doesn't belong to this ring
if (!hostIdsInCurrentRing.contains(priorityID))
{
logger.info("{} is not part of the current ring, will be removed from priority list.", priorityID);
removePriorityStatus(repairType, priorityID);
}
else
{
priorityHostId = priorityID;
break;
}
}
}
if (priorityHostId != null && !myId.equals(priorityHostId))
{
logger.info("Priority list is not empty and I'm not the first node in the list, not my turn." +
"First node in priority list is {}", getBroadcastAddress(priorityHostId));
return NOT_MY_TURN;
}
if (myId.equals(priorityHostId))
{
//I have a priority for repair hence its my turn now
return MY_TURN_DUE_TO_PRIORITY;
}
// Determine if this node is the most eligible host to repair.
AutoRepairHistory nodeToBeRepaired = getMostEligibleHostToRepair(repairType, currentRepairStatus, myId);
if (nodeToBeRepaired != null)
{
if (nodeToBeRepaired.hostId.equals(myId))
{
logger.info("This node is selected to be repaired for repair type {}", repairType);
return MY_TURN;
}
// log which node is next, which is helpful for debugging
logger.info("Next node to be repaired for repair type {}: {} ({})", repairType,
getBroadcastAddress(nodeToBeRepaired.hostId),
nodeToBeRepaired);
}
// If this node is not identified as most eligible, set the repair lag time.
if (currentRepairStatus.myRepairHistory != null)
{
AutoRepairMetricsManager.getMetrics(repairType)
.recordRepairStartLag(currentRepairStatus.myRepairHistory.lastRepairFinishTime);
}
}
else if (currentRepairStatus.hostIdsWithOnGoingForceRepair.contains(myId))
{
return MY_TURN_FORCE_REPAIR;
}
// for some reason I was not done with the repair hence resume (maybe node restart in-between, etc.)
return currentRepairStatus.hostIdsWithOnGoingRepair.contains(myId) ? MY_TURN : NOT_MY_TURN;
}
catch (Exception e)
{
logger.error("Exception while deciding node's turn:", e);
}
return NOT_MY_TURN;
}