List selectMessages()

in helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java [137:250]


  List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
      Map<String, String> currentStates, Map<String, Message> pendingMessages,
      List<Message> messages, Collection<Message> pendingRelayMessages,
      Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
      StateModelDefinition stateModelDef, boolean p2pMessageEnabled) {
    if (messages == null || messages.isEmpty()) {
      return Collections.emptyList();
    }
    List<Message> selectedMessages = new ArrayList<>();
    Map<String, Integer> stateCnts = new HashMap<>();

    String initialState = stateModelDef.getInitialState();
    // count currentState, if no currentState, count as in initialState
    for (String instance : liveInstances.keySet()) {
      String state = initialState;
      if (currentStates.containsKey(instance)) {
        state = currentStates.get(instance);
      }

      increaseStateCnt(stateConstraints, state, stateCnts);
    }

    // count pendingStates
    for (String instance : pendingMessages.keySet()) {
      Message message = pendingMessages.get(instance);
      increaseStateCnt(stateConstraints, message.getToState(), stateCnts);
      increaseStateCnt(stateConstraints, message.getFromState(), stateCnts);
    }

    // group messages based on state transition priority
    Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
        new TreeMap<>();

    /* record all state transition messages that transition a replica from top-state */
    List<Message> fromTopStateMessages = new LinkedList<>();
    for (Message message : messages) {
      if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION_CANCELLATION.name())) {
        selectedMessages.add(message);
        continue;
      }
      String fromState = message.getFromState();
      String toState = message.getToState();
      String transition = fromState + "-" + toState;
      int priority = Integer.MAX_VALUE;

      if (stateTransitionPriorities.containsKey(transition)) {
        priority = stateTransitionPriorities.get(transition);
      }

      if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
        messagesGroupByStateTransitPriority.put(priority, new ArrayList<>());
      }
      messagesGroupByStateTransitPriority.get(priority).add(message);

      if (fromState.equals(stateModelDef.getTopState())) {
        fromTopStateMessages.add(message);
      }
    }

    // select messages
    for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
      NextMessage:
      for (Message message : messageList) {
        String toState = message.getToState();
        String fromState = message.getFromState();

        if (toState.equals(stateModelDef.getTopState())) {
          // find if there are any pending relay messages match this message.
          // if the pending relay message targets the same host, we are fine to continue send the message,
          // if it targets to different host, we should not send the message now (should send after the relay message gets expired).
          for (Message relayMsg : pendingRelayMessages) {
            if (relayMsg.getToState().equals(toState) && relayMsg.getFromState()
                .equals(fromState)) {
              LOG.info(
                  "There is pending relay message, pending relay message: {}, relay time starts {}, expiry timeout {}.",
                  relayMsg.getMsgId(), relayMsg.getRelayTime(), relayMsg.getExpiryPeriod());
              if (!relayMsg.getTgtName().equals(message.getTgtName())) {
                LOG.info(
                    "The pending relay message was sent to a different host, not send message: {}, pending relay message: {}",
                    message.getMsgId(), relayMsg.getId());
                continue NextMessage;
              }
            }
          }
        }

        if (stateConstraints.containsKey(toState)) {
          int newCnt = (stateCnts.containsKey(toState) ? stateCnts.get(toState) + 1 : 1);
          if (newCnt > stateConstraints.get(toState).getUpperBound()) {
            if (p2pMessageEnabled && toState.equals(stateModelDef.getTopState())
                && stateModelDef.isSingleTopStateModel()) {
              // attach this message as a relay message to the message to transition off current top-state replica
              if (fromTopStateMessages.size() > 0) {
                Message fromTopStateMsg = fromTopStateMessages.get(0);
                fromTopStateMsg.attachRelayMessage(message.getTgtName(), message);
                fromTopStateMessages.remove(0);
              }
            } else {
              // reach upper-bound of message for the topState, will not send the message
              LogUtil.logInfo(LOG, _eventId,
                  "Reach upper_bound: " + stateConstraints.get(toState).getUpperBound()
                      + ", not send message: " + message);
            }
            continue;
          }
        }

        increaseStateCnt(stateConstraints, message.getToState(), stateCnts);
        selectedMessages.add(message);
      }
    }

    return selectedMessages;
  }