in helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java [301:424]
protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
StateModelDefinition stateModelDef, List<String> preferenceList,
CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
IdealState idealState, ClusterConfig clusterConfig, Partition partition,
MonitoredAbnormalResolver monitoredResolver, ResourceControllerDataProvider cache) {
Optional<Map<String, String>> optionalOverwrittenStates =
computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
idealState, partition, monitoredResolver);
if (optionalOverwrittenStates.isPresent()) {
return optionalOverwrittenStates.get();
}
Map<String, String> currentStateMap = new HashMap<>(
currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition));
// Instances not in preference list but still have active replica, retain to avoid zero replica during movement
List<String> currentInstances = new ArrayList<>(currentStateMap.keySet());
Collections.sort(currentInstances);
Map<String, String> pendingStates =
new HashMap<>(currentStateOutput.getPendingStateMap(idealState.getResourceName(), partition));
for (String instance : pendingStates.keySet()) {
if (!currentStateMap.containsKey(instance)) {
currentStateMap.put(instance, stateModelDef.getInitialState());
currentInstances.add(instance);
}
}
Set<String> instancesToDrop = new HashSet<>();
Iterator<String> it = currentInstances.iterator();
while (it.hasNext()) {
String instance = it.next();
String state = currentStateMap.get(instance);
// TODO: This may never be a possible case, figure out if we can safely remove this.
if (state == null) {
it.remove();
instancesToDrop.add(instance); // These instances should be set to DROPPED after we get bestPossibleStateMap;
}
}
// Sort the instancesToMove by their current partition state.
// Reason: because the states are assigned to instances in the order appeared in preferenceList, if we have
// [node1:Slave, node2:Master], we want to keep it that way, instead of assigning Master to node1.
if (preferenceList == null) {
preferenceList = Collections.emptyList();
}
boolean isPreferenceListEmpty = preferenceList.isEmpty();
int numExtraReplicas = getNumExtraReplicas(clusterConfig);
// TODO : Keep the behavior consistent with existing state count, change back to read from idealstate
// replicas
int numReplicas = preferenceList.size();
List<String> instanceToAdd = new ArrayList<>(preferenceList);
instanceToAdd.removeAll(currentInstances);
List<String> combinedPreferenceList = new ArrayList<>();
if (currentInstances.size() <= numReplicas
&& numReplicas + numExtraReplicas - currentInstances.size() > 0) {
int subListSize = numReplicas + numExtraReplicas - currentInstances.size();
combinedPreferenceList.addAll(instanceToAdd
.subList(0, Math.min(subListSize, instanceToAdd.size())));
}
// Make all initial state instance not in preference list to be dropped.
Map<String, String> currentMapWithPreferenceList = new HashMap<>(currentStateMap);
currentMapWithPreferenceList.keySet().retainAll(preferenceList);
combinedPreferenceList.addAll(currentInstances);
combinedPreferenceList.sort(
new PreferenceListNodeComparator(currentStateMap, stateModelDef, preferenceList, cache));
// if preference list is not empty, and we do have new intanceToAdd, we
// should check if it has capacity to hold the partition.
boolean isWaged = WagedValidationUtil.isWagedEnabled(idealState) && cache != null;
if (isWaged && !isPreferenceListEmpty && !instanceToAdd.isEmpty()) {
// check instanceToAdd instance appears in combinedPreferenceList
for (String instance : instanceToAdd) {
if (combinedPreferenceList.contains(instance)) {
if (!cache.checkAndReduceCapacity(instance, idealState.getResourceName(),
partition.getPartitionName())) {
// if instanceToAdd instance has no capacity to hold the partition, we should
// remove it from combinedPreferenceList
LOG.info("Instance: {} has no capacity to hold resource: {}, partition: {}, removing "
+ "it from combinedPreferenceList.", instance, idealState.getResourceName(),
partition.getPartitionName());
combinedPreferenceList.remove(instance);
}
}
}
}
// Assign states to instances with the combined preference list.
Map<String, String> bestPossibleStateMap =
computeBestPossibleMap(combinedPreferenceList, stateModelDef, currentStateMap,
liveInstances, disabledInstancesForPartition);
for (String instance : instancesToDrop) {
bestPossibleStateMap.put(instance, HelixDefinedState.DROPPED.name());
}
// If the load-balance finishes (all replica are migrated to new instances),
// we should drop all partitions from previous assigned instances.
if (!currentMapWithPreferenceList.containsValue(HelixDefinedState.ERROR.name())
&& bestPossibleStateMap.size() > numReplicas && readyToDrop(currentStateMap,
bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
String instanceToDrop = combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name());
}
}
// TODO: This may not be necessary, all of the instances bestPossibleStateMap should be set to ERROR
// if necessary in the call to computeBestPossibleMap.
// Adding ERROR replica mapping to best possible
// ERROR assignment should be mutual excluded from DROPPED assignment because
// once there is an ERROR replica in the mapping, bestPossibleStateMap.size() > numReplicas prevents
// code entering the DROPPING stage.
for (String instance : combinedPreferenceList) {
if (currentStateMap.containsKey(instance) && currentStateMap.get(instance)
.equals(HelixDefinedState.ERROR.name())) {
bestPossibleStateMap.put(instance, HelixDefinedState.ERROR.name());
}
}
return bestPossibleStateMap;
}