in helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java [137:250]
List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
Map<String, String> currentStates, Map<String, Message> pendingMessages,
List<Message> messages, Collection<Message> pendingRelayMessages,
Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
StateModelDefinition stateModelDef, boolean p2pMessageEnabled) {
if (messages == null || messages.isEmpty()) {
return Collections.emptyList();
}
List<Message> selectedMessages = new ArrayList<>();
Map<String, Integer> stateCnts = new HashMap<>();
String initialState = stateModelDef.getInitialState();
// count currentState, if no currentState, count as in initialState
for (String instance : liveInstances.keySet()) {
String state = initialState;
if (currentStates.containsKey(instance)) {
state = currentStates.get(instance);
}
increaseStateCnt(stateConstraints, state, stateCnts);
}
// count pendingStates
for (String instance : pendingMessages.keySet()) {
Message message = pendingMessages.get(instance);
increaseStateCnt(stateConstraints, message.getToState(), stateCnts);
increaseStateCnt(stateConstraints, message.getFromState(), stateCnts);
}
// group messages based on state transition priority
Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
new TreeMap<>();
/* record all state transition messages that transition a replica from top-state */
List<Message> fromTopStateMessages = new LinkedList<>();
for (Message message : messages) {
if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION_CANCELLATION.name())) {
selectedMessages.add(message);
continue;
}
String fromState = message.getFromState();
String toState = message.getToState();
String transition = fromState + "-" + toState;
int priority = Integer.MAX_VALUE;
if (stateTransitionPriorities.containsKey(transition)) {
priority = stateTransitionPriorities.get(transition);
}
if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
messagesGroupByStateTransitPriority.put(priority, new ArrayList<>());
}
messagesGroupByStateTransitPriority.get(priority).add(message);
if (fromState.equals(stateModelDef.getTopState())) {
fromTopStateMessages.add(message);
}
}
// select messages
for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
NextMessage:
for (Message message : messageList) {
String toState = message.getToState();
String fromState = message.getFromState();
if (toState.equals(stateModelDef.getTopState())) {
// find if there are any pending relay messages match this message.
// if the pending relay message targets the same host, we are fine to continue send the message,
// if it targets to different host, we should not send the message now (should send after the relay message gets expired).
for (Message relayMsg : pendingRelayMessages) {
if (relayMsg.getToState().equals(toState) && relayMsg.getFromState()
.equals(fromState)) {
LOG.info(
"There is pending relay message, pending relay message: {}, relay time starts {}, expiry timeout {}.",
relayMsg.getMsgId(), relayMsg.getRelayTime(), relayMsg.getExpiryPeriod());
if (!relayMsg.getTgtName().equals(message.getTgtName())) {
LOG.info(
"The pending relay message was sent to a different host, not send message: {}, pending relay message: {}",
message.getMsgId(), relayMsg.getId());
continue NextMessage;
}
}
}
}
if (stateConstraints.containsKey(toState)) {
int newCnt = (stateCnts.containsKey(toState) ? stateCnts.get(toState) + 1 : 1);
if (newCnt > stateConstraints.get(toState).getUpperBound()) {
if (p2pMessageEnabled && toState.equals(stateModelDef.getTopState())
&& stateModelDef.isSingleTopStateModel()) {
// attach this message as a relay message to the message to transition off current top-state replica
if (fromTopStateMessages.size() > 0) {
Message fromTopStateMsg = fromTopStateMessages.get(0);
fromTopStateMsg.attachRelayMessage(message.getTgtName(), message);
fromTopStateMessages.remove(0);
}
} else {
// reach upper-bound of message for the topState, will not send the message
LogUtil.logInfo(LOG, _eventId,
"Reach upper_bound: " + stateConstraints.get(toState).getUpperBound()
+ ", not send message: " + message);
}
continue;
}
}
increaseStateCnt(stateConstraints, message.getToState(), stateCnts);
selectedMessages.add(message);
}
}
return selectedMessages;
}