in helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java [58:199]
public IdealState computeNewIdealState(String resourceName,
IdealState currentIdealState, CurrentStateOutput currentStateOutput,
ResourceControllerDataProvider clusterData) {
IdealState cachedIdealState = getCachedIdealState(resourceName, clusterData);
if (cachedIdealState != null) {
LOG.debug("Use cached IdealState for {}", resourceName);
return cachedIdealState;
}
LOG.info("Computing IdealState for " + resourceName);
List<String> allPartitions = getStablePartitionList(clusterData, currentIdealState);
if (allPartitions.size() == 0) {
LOG.info("Partition count is 0 for resource " + resourceName
+ ", stop calculate ideal mapping for the resource.");
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
Map<String, List<String>> userDefinedPreferenceList = new HashMap<>();
ClusterConfig clusterConfig = clusterData.getClusterConfig();
ResourceConfig resourceConfig = clusterData.getResourceConfig(resourceName);
boolean delayRebalanceEnabled =
DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig);
if (resourceConfig != null) {
userDefinedPreferenceList = resourceConfig.getPreferenceLists();
if (!userDefinedPreferenceList.isEmpty()) {
LOG.info("Using user defined preference list for partitions: " + userDefinedPreferenceList
.keySet());
}
}
Set<String> assignableLiveEnabledNodes;
Set<String> assignableNodes;
String instanceTag = currentIdealState.getInstanceGroupTag();
if (instanceTag != null) {
assignableLiveEnabledNodes = clusterData.getEnabledLiveInstancesWithTag(instanceTag);
assignableNodes = clusterData.getAssignableInstancesWithTag(instanceTag);
if (LOG.isInfoEnabled()) {
LOG.info(String.format(
"Found the following participants with tag %s for %s: "
+ "instances: %s, liveEnabledInstances: %s",
currentIdealState.getInstanceGroupTag(), resourceName, assignableNodes, assignableLiveEnabledNodes));
}
} else {
assignableLiveEnabledNodes = clusterData.getEnabledLiveInstances();
assignableNodes = clusterData.getAssignableInstances();
}
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
Set<String> activeNodes =
DelayedRebalanceUtil.getActiveNodes(assignableNodes, currentIdealState, assignableLiveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(),
clusterData.getAssignableLiveInstances().keySet(),
clusterData.getAssignableInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
offlineOrDisabledInstances.removeAll(assignableLiveEnabledNodes);
DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
clusterData.getAssignableLiveInstances().keySet(),
clusterData.getAssignableInstanceConfigMap(), delay,
clusterConfig, _manager);
}
if (assignableNodes.isEmpty() || activeNodes.isEmpty()) {
LOG.error(String.format(
"No instances or active instances available for resource %s, "
+ "allInstances: %s, liveInstances: %s, activeInstances: %s", resourceName, assignableNodes,
assignableLiveEnabledNodes, activeNodes));
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
StateModelDefinition stateModelDef =
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef());
int replicaCount = currentIdealState.getReplicaCount(activeNodes.size());
if (replicaCount == 0) {
LOG.error("Replica count is 0 for resource " + resourceName
+ ", stop calculate ideal mapping for the resource.");
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
LinkedHashMap<String, Integer> stateCountMap =
stateModelDef.getStateCountMap(activeNodes.size(), replicaCount);
Map<String, Map<String, String>> currentMapping =
currentMapping(currentStateOutput, resourceName, allPartitions, stateCountMap);
int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
_rebalanceStrategy =
getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), allPartitions, resourceName,
stateCountMap, maxPartition);
List<String> allNodeList = new ArrayList<>(assignableNodes);
List<String> liveEnabledAssignableNodeList = new ArrayList<>(assignableLiveEnabledNodes);
// sort node lists to ensure consistent preferred assignments
Collections.sort(allNodeList);
Collections.sort(liveEnabledAssignableNodeList);
ZNRecord newIdealMapping =
_rebalanceStrategy.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList,
currentMapping, clusterData);
ZNRecord finalMapping = newIdealMapping;
if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)
|| liveEnabledAssignableNodeList.size() != activeNodes.size()) {
List<String> activeNodeList = new ArrayList<>(activeNodes);
Collections.sort(activeNodeList);
int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica(
ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, currentIdealState),
currentIdealState, replicaCount);
ZNRecord newActiveMapping =
_rebalanceStrategy.computePartitionAssignment(allNodeList, activeNodeList, currentMapping,
clusterData);
finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping,
assignableLiveEnabledNodes, replicaCount, minActiveReplicas);
}
finalMapping.getListFields().putAll(userDefinedPreferenceList);
LOG.debug("currentMapping: {}", currentMapping);
LOG.debug("stateCountMap: {}", stateCountMap);
LOG.debug("assignableLiveEnabledNodes: {}", assignableLiveEnabledNodes);
LOG.debug("activeNodes: {}", activeNodes);
LOG.debug("assignableNodes: {}", assignableNodes);
LOG.debug("maxPartition: {}", maxPartition);
LOG.debug("newIdealMapping: {}", newIdealMapping);
LOG.debug("finalMapping: {}", finalMapping);
IdealState idealState = generateNewIdealState(resourceName, currentIdealState, finalMapping);
clusterData.setCachedIdealMapping(resourceName, idealState.getRecord());
return idealState;
}