in helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java [297:436]
private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDataProvider cache,
ClusterStatusMonitor clusterStatusMonitor, IdealState idealState, Resource resource,
CurrentStateOutput currentStateOutput, PartitionStateMap bestPossiblePartitionStateMap,
Map<String, List<String>> preferenceLists,
StateTransitionThrottleController throttleController,
Map<Partition, List<Message>> resourceMessageMap) {
String resourceName = resource.getResourceName();
LogUtil.logDebug(logger, _eventId, String.format("Processing resource: %s", resourceName));
// Throttling is applied only on FULL-AUTO mode and if the resource message map is empty, no throttling needed.
// TODO: The potential optimization to make the logic computation async and report the metric for recovery/load
// rebalance.
if (!IdealState.RebalanceMode.FULL_AUTO.equals(idealState.getRebalanceMode())
|| resourceMessageMap.isEmpty()) {
return bestPossiblePartitionStateMap;
}
String stateModelDefName = idealState.getStateModelDefRef();
StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
// This requires a deep copy of current state map because some of the states will be overwritten by applying
// messages to it.
Set<Partition> partitionsWithErrorStateReplica = new HashSet<>();
Set<String> messagesForRecovery = new HashSet<>();
Set<String> messagesForLoad = new HashSet<>();
Set<String> messagesThrottledForRecovery = new HashSet<>();
Set<String> messagesThrottledForLoad = new HashSet<>();
ClusterConfig clusterConfig = cache.getClusterConfig();
// If the threshold (ErrorOrRecovery) is set, then use it, if not, then check if the old
// threshold (Error) is set. If the old threshold is set, use it. If not, use the default value
// for the new one. This is for backward-compatibility
int threshold = 1; // Default threshold for ErrorOrRecoveryPartitionThresholdForLoadBalance
// Keep the error count as partition level. This logic only applies to downward state transition determination
for (Partition partition : currentStateOutput.getCurrentStateMap(resourceName).keySet()) {
Map<String, String> entry =
currentStateOutput.getCurrentStateMap(resourceName).get(partition);
if (entry.values().stream().anyMatch(x -> x.contains(HelixDefinedState.ERROR.name()))) {
partitionsWithErrorStateReplica.add(partition);
}
}
int numPartitionsWithErrorReplica = partitionsWithErrorStateReplica.size();
if (clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance() != -1) {
// ErrorOrRecovery is set
threshold = clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance();
} else {
if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) {
// 0 is the default value so the old threshold has been set
threshold = clusterConfig.getErrorPartitionThresholdForLoadBalance();
}
}
// Perform regular load balance only if the number of partitions in recovery and in error is
// less than the threshold. Otherwise, only allow downward-transition load balance
boolean onlyDownwardLoadBalance = numPartitionsWithErrorReplica > threshold;
chargePendingTransition(resource, currentStateOutput, throttleController, cache,
preferenceLists, stateModelDef);
// Sort partitions in case of urgent partition need to take the quota first.
List<Partition> partitions = new ArrayList<>(resource.getPartitions());
partitions.sort(new PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
currentStateOutput.getCurrentStateMap(resourceName), stateModelDef.getTopState()));
for (Partition partition : partitions) {
if (resourceMessageMap.get(partition) == null || resourceMessageMap.get(partition)
.isEmpty()) {
continue;
}
List<Message> messagesToThrottle = new ArrayList<>(resourceMessageMap.get(partition));
Map<String, String> derivedCurrentStateMap =
currentStateOutput.getCurrentStateMap(resourceName, partition).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
Map<String, Integer> requiredState = getRequiredStates(resourceName, cache, preferenceList);
if (preferenceList != null && !preferenceList.isEmpty()) {
// Sort messages based on the priority (priority is defined in the state model definition
messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap()));
}
for (Message message : messagesToThrottle) {
RebalanceType rebalanceType =
getRebalanceTypePerMessage(requiredState, message, derivedCurrentStateMap);
// Number of states required by StateModelDefinition are not satisfied, need recovery
if (rebalanceType.equals(RebalanceType.RECOVERY_BALANCE)) {
message.setSTRebalanceType(Message.STRebalanceType.RECOVERY_REBALANCE);
messagesForRecovery.add(message.getId());
recoveryRebalance(resource, partition, throttleController, message, cache,
messagesThrottledForRecovery, resourceMessageMap);
} else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) {
message.setSTRebalanceType(Message.STRebalanceType.LOAD_REBALANCE);
messagesForLoad.add(message.getId());
loadRebalance(resource, partition, throttleController, message, cache,
onlyDownwardLoadBalance, stateModelDef, messagesThrottledForLoad, resourceMessageMap);
}
// Apply the message to temporary current state map
if (!messagesThrottledForRecovery.contains(message.getId()) && !messagesThrottledForLoad
.contains(message.getId())) {
derivedCurrentStateMap.put(message.getTgtName(), message.getToState());
}
}
}
// TODO: We may need to optimize it to be async compute for intermediate state output.
PartitionStateMap intermediatePartitionStateMap =
new PartitionStateMap(resourceName, currentStateOutput.getCurrentStateMap(resourceName));
computeIntermediateMap(intermediatePartitionStateMap,
currentStateOutput.getPendingMessageMap(resourceName), resourceMessageMap);
if (!messagesForRecovery.isEmpty()) {
LogUtil.logInfo(logger, _eventId, String
.format("Recovery balance needed for %s with messages: %s", resourceName,
messagesForRecovery));
}
if (!messagesForLoad.isEmpty()) {
LogUtil.logInfo(logger, _eventId, String
.format("Load balance needed for %s with messages: %s", resourceName, messagesForLoad));
}
if (!partitionsWithErrorStateReplica.isEmpty()) {
LogUtil.logInfo(logger, _eventId, String
.format("Partition currently has an ERROR replica in %s partitions: %s", resourceName,
partitionsWithErrorStateReplica));
}
if (clusterStatusMonitor != null) {
clusterStatusMonitor
.updateRebalancerStats(resourceName, messagesForRecovery.size(), messagesForLoad.size(),
messagesThrottledForRecovery.size(), messagesThrottledForLoad.size(),
onlyDownwardLoadBalance);
}
if (logger.isDebugEnabled()) {
logPartitionMapState(resourceName, new HashSet<>(resource.getPartitions()),
messagesForRecovery, messagesThrottledForRecovery, messagesForLoad,
messagesThrottledForLoad, currentStateOutput, bestPossiblePartitionStateMap,
intermediatePartitionStateMap);
}
LogUtil.logDebug(logger, _eventId, String.format("End processing resource: %s", resourceName));
return intermediatePartitionStateMap;
}