public IdealState computeNewIdealState()

in helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java [58:199]


  public IdealState computeNewIdealState(String resourceName,
      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
      ResourceControllerDataProvider clusterData) {

    IdealState cachedIdealState = getCachedIdealState(resourceName, clusterData);
    if (cachedIdealState != null) {
      LOG.debug("Use cached IdealState for {}", resourceName);
      return cachedIdealState;
    }

    LOG.info("Computing IdealState for " + resourceName);

    List<String> allPartitions = getStablePartitionList(clusterData, currentIdealState);
    if (allPartitions.size() == 0) {
      LOG.info("Partition count is 0 for resource " + resourceName
          + ", stop calculate ideal mapping for the resource.");
      return generateNewIdealState(resourceName, currentIdealState,
          emptyMapping(currentIdealState));
    }

    Map<String, List<String>> userDefinedPreferenceList = new HashMap<>();

    ClusterConfig clusterConfig = clusterData.getClusterConfig();
    ResourceConfig resourceConfig = clusterData.getResourceConfig(resourceName);
    boolean delayRebalanceEnabled =
        DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig);

    if (resourceConfig != null) {
      userDefinedPreferenceList = resourceConfig.getPreferenceLists();
      if (!userDefinedPreferenceList.isEmpty()) {
        LOG.info("Using user defined preference list for partitions: " + userDefinedPreferenceList
            .keySet());
      }
    }

    Set<String> assignableLiveEnabledNodes;
    Set<String> assignableNodes;

    String instanceTag = currentIdealState.getInstanceGroupTag();
    if (instanceTag != null) {
      assignableLiveEnabledNodes = clusterData.getEnabledLiveInstancesWithTag(instanceTag);
      assignableNodes = clusterData.getAssignableInstancesWithTag(instanceTag);

      if (LOG.isInfoEnabled()) {
        LOG.info(String.format(
            "Found the following participants with tag %s for %s: "
                + "instances: %s, liveEnabledInstances: %s",
            currentIdealState.getInstanceGroupTag(), resourceName, assignableNodes, assignableLiveEnabledNodes));
      }
    } else {
      assignableLiveEnabledNodes = clusterData.getEnabledLiveInstances();
      assignableNodes = clusterData.getAssignableInstances();
    }

    long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
    Set<String> activeNodes =
        DelayedRebalanceUtil.getActiveNodes(assignableNodes, currentIdealState, assignableLiveEnabledNodes,
            clusterData.getInstanceOfflineTimeMap(),
            clusterData.getAssignableLiveInstances().keySet(),
            clusterData.getAssignableInstanceConfigMap(), delay, clusterConfig);
    if (delayRebalanceEnabled) {
      Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
      offlineOrDisabledInstances.removeAll(assignableLiveEnabledNodes);
      DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
          offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
          clusterData.getAssignableLiveInstances().keySet(),
          clusterData.getAssignableInstanceConfigMap(), delay,
          clusterConfig, _manager);
    }

    if (assignableNodes.isEmpty() || activeNodes.isEmpty()) {
      LOG.error(String.format(
          "No instances or active instances available for resource %s, "
              + "allInstances: %s, liveInstances: %s, activeInstances: %s", resourceName, assignableNodes,
          assignableLiveEnabledNodes, activeNodes));
      return generateNewIdealState(resourceName, currentIdealState,
          emptyMapping(currentIdealState));
    }

    StateModelDefinition stateModelDef =
        clusterData.getStateModelDef(currentIdealState.getStateModelDefRef());

    int replicaCount = currentIdealState.getReplicaCount(activeNodes.size());
    if (replicaCount == 0) {
      LOG.error("Replica count is 0 for resource " + resourceName
          + ", stop calculate ideal mapping for the resource.");
      return generateNewIdealState(resourceName, currentIdealState,
          emptyMapping(currentIdealState));
    }

    LinkedHashMap<String, Integer> stateCountMap =
        stateModelDef.getStateCountMap(activeNodes.size(), replicaCount);
    Map<String, Map<String, String>> currentMapping =
        currentMapping(currentStateOutput, resourceName, allPartitions, stateCountMap);

    int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
    _rebalanceStrategy =
        getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), allPartitions, resourceName,
            stateCountMap, maxPartition);

    List<String> allNodeList = new ArrayList<>(assignableNodes);
    List<String> liveEnabledAssignableNodeList = new ArrayList<>(assignableLiveEnabledNodes);

    // sort node lists to ensure consistent preferred assignments
    Collections.sort(allNodeList);
    Collections.sort(liveEnabledAssignableNodeList);

    ZNRecord newIdealMapping =
        _rebalanceStrategy.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList,
            currentMapping, clusterData);
    ZNRecord finalMapping = newIdealMapping;

    if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)
        || liveEnabledAssignableNodeList.size() != activeNodes.size()) {
      List<String> activeNodeList = new ArrayList<>(activeNodes);
      Collections.sort(activeNodeList);
      int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica(
          ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, currentIdealState),
          currentIdealState, replicaCount);

      ZNRecord newActiveMapping =
          _rebalanceStrategy.computePartitionAssignment(allNodeList, activeNodeList, currentMapping,
              clusterData);
      finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping,
          assignableLiveEnabledNodes, replicaCount, minActiveReplicas);
    }

    finalMapping.getListFields().putAll(userDefinedPreferenceList);

    LOG.debug("currentMapping: {}", currentMapping);
    LOG.debug("stateCountMap: {}", stateCountMap);
    LOG.debug("assignableLiveEnabledNodes: {}", assignableLiveEnabledNodes);
    LOG.debug("activeNodes: {}", activeNodes);
    LOG.debug("assignableNodes: {}", assignableNodes);
    LOG.debug("maxPartition: {}", maxPartition);
    LOG.debug("newIdealMapping: {}", newIdealMapping);
    LOG.debug("finalMapping: {}", finalMapping);

    IdealState idealState = generateNewIdealState(resourceName, currentIdealState, finalMapping);
    clusterData.setCachedIdealMapping(resourceName, idealState.getRecord());
    return idealState;
  }