private PartitionStateMap computeIntermediatePartitionState()

in helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java [297:436]


  private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDataProvider cache,
      ClusterStatusMonitor clusterStatusMonitor, IdealState idealState, Resource resource,
      CurrentStateOutput currentStateOutput, PartitionStateMap bestPossiblePartitionStateMap,
      Map<String, List<String>> preferenceLists,
      StateTransitionThrottleController throttleController,
      Map<Partition, List<Message>> resourceMessageMap) {
    String resourceName = resource.getResourceName();
    LogUtil.logDebug(logger, _eventId, String.format("Processing resource: %s", resourceName));

    // Throttling is applied only on FULL-AUTO mode and if the resource message map is empty, no throttling needed.
    // TODO: The potential optimization to make the logic computation async and report the metric for recovery/load
    // rebalance.
    if (!IdealState.RebalanceMode.FULL_AUTO.equals(idealState.getRebalanceMode())
        || resourceMessageMap.isEmpty()) {
      return bestPossiblePartitionStateMap;
    }

    String stateModelDefName = idealState.getStateModelDefRef();
    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
    // This requires a deep copy of current state map because some of the states will be overwritten by applying
    // messages to it.

    Set<Partition> partitionsWithErrorStateReplica = new HashSet<>();
    Set<String> messagesForRecovery = new HashSet<>();
    Set<String> messagesForLoad = new HashSet<>();
    Set<String> messagesThrottledForRecovery = new HashSet<>();
    Set<String> messagesThrottledForLoad = new HashSet<>();
    ClusterConfig clusterConfig = cache.getClusterConfig();

    // If the threshold (ErrorOrRecovery) is set, then use it, if not, then check if the old
    // threshold (Error) is set. If the old threshold is set, use it. If not, use the default value
    // for the new one. This is for backward-compatibility
    int threshold = 1; // Default threshold for ErrorOrRecoveryPartitionThresholdForLoadBalance
    // Keep the error count as partition level. This logic only applies to downward state transition determination
    for (Partition partition : currentStateOutput.getCurrentStateMap(resourceName).keySet()) {
      Map<String, String> entry =
          currentStateOutput.getCurrentStateMap(resourceName).get(partition);
      if (entry.values().stream().anyMatch(x -> x.contains(HelixDefinedState.ERROR.name()))) {
        partitionsWithErrorStateReplica.add(partition);
      }
    }
    int numPartitionsWithErrorReplica = partitionsWithErrorStateReplica.size();
    if (clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance() != -1) {
      // ErrorOrRecovery is set
      threshold = clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance();
    } else {
      if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) {
        // 0 is the default value so the old threshold has been set
        threshold = clusterConfig.getErrorPartitionThresholdForLoadBalance();
      }
    }

    // Perform regular load balance only if the number of partitions in recovery and in error is
    // less than the threshold. Otherwise, only allow downward-transition load balance
    boolean onlyDownwardLoadBalance = numPartitionsWithErrorReplica > threshold;

    chargePendingTransition(resource, currentStateOutput, throttleController, cache,
        preferenceLists, stateModelDef);

    // Sort partitions in case of urgent partition need to take the quota first.
    List<Partition> partitions = new ArrayList<>(resource.getPartitions());
    partitions.sort(new PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
        currentStateOutput.getCurrentStateMap(resourceName), stateModelDef.getTopState()));
    for (Partition partition : partitions) {
      if (resourceMessageMap.get(partition) == null || resourceMessageMap.get(partition)
          .isEmpty()) {
        continue;
      }
      List<Message> messagesToThrottle = new ArrayList<>(resourceMessageMap.get(partition));
      Map<String, String> derivedCurrentStateMap =
          currentStateOutput.getCurrentStateMap(resourceName, partition).entrySet().stream()
              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
      List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
      Map<String, Integer> requiredState = getRequiredStates(resourceName, cache, preferenceList);
      if (preferenceList != null && !preferenceList.isEmpty()) {
        // Sort messages based on the priority (priority is defined in the state model definition
        messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap()));
      }
      for (Message message : messagesToThrottle) {
        RebalanceType rebalanceType =
            getRebalanceTypePerMessage(requiredState, message, derivedCurrentStateMap);

        // Number of states required by StateModelDefinition are not satisfied, need recovery
        if (rebalanceType.equals(RebalanceType.RECOVERY_BALANCE)) {
          message.setSTRebalanceType(Message.STRebalanceType.RECOVERY_REBALANCE);
          messagesForRecovery.add(message.getId());
          recoveryRebalance(resource, partition, throttleController, message, cache,
              messagesThrottledForRecovery, resourceMessageMap);
        } else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) {
          message.setSTRebalanceType(Message.STRebalanceType.LOAD_REBALANCE);
          messagesForLoad.add(message.getId());
          loadRebalance(resource, partition, throttleController, message, cache,
              onlyDownwardLoadBalance, stateModelDef, messagesThrottledForLoad, resourceMessageMap);
        }

        // Apply the message to temporary current state map
        if (!messagesThrottledForRecovery.contains(message.getId()) && !messagesThrottledForLoad
            .contains(message.getId())) {
          derivedCurrentStateMap.put(message.getTgtName(), message.getToState());
        }
      }
    }
    // TODO: We may need to optimize it to be async compute for intermediate state output.
    PartitionStateMap intermediatePartitionStateMap =
        new PartitionStateMap(resourceName, currentStateOutput.getCurrentStateMap(resourceName));
    computeIntermediateMap(intermediatePartitionStateMap,
        currentStateOutput.getPendingMessageMap(resourceName), resourceMessageMap);

    if (!messagesForRecovery.isEmpty()) {
      LogUtil.logInfo(logger, _eventId, String
          .format("Recovery balance needed for %s with messages: %s", resourceName,
              messagesForRecovery));
    }
    if (!messagesForLoad.isEmpty()) {
      LogUtil.logInfo(logger, _eventId, String
          .format("Load balance needed for %s with messages: %s", resourceName, messagesForLoad));
    }
    if (!partitionsWithErrorStateReplica.isEmpty()) {
      LogUtil.logInfo(logger, _eventId, String
          .format("Partition currently has an ERROR replica in %s partitions: %s", resourceName,
              partitionsWithErrorStateReplica));
    }

    if (clusterStatusMonitor != null) {
      clusterStatusMonitor
          .updateRebalancerStats(resourceName, messagesForRecovery.size(), messagesForLoad.size(),
              messagesThrottledForRecovery.size(), messagesThrottledForLoad.size(),
              onlyDownwardLoadBalance);
    }

    if (logger.isDebugEnabled()) {
      logPartitionMapState(resourceName, new HashSet<>(resource.getPartitions()),
          messagesForRecovery, messagesThrottledForRecovery, messagesForLoad,
          messagesThrottledForLoad, currentStateOutput, bestPossiblePartitionStateMap,
          intermediatePartitionStateMap);
    }

    LogUtil.logDebug(logger, _eventId, String.format("End processing resource: %s", resourceName));
    return intermediatePartitionStateMap;
  }