in helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java [851:1061]
public void onMessage(String instanceName, List<Message> messages,
NotificationContext changeContext) {
HelixManager manager = changeContext.getManager();
// If FINALIZE notification comes, reset all handler factories
// and terminate all the thread pools
// TODO: see if we should have a separate notification call for resetting
if (changeContext.getType() == Type.FINALIZE) {
reset();
return;
}
if (changeContext.getType() == Type.INIT) {
init();
// continue to process messages
}
_isCleanState = false;
// if prefetch is disabled in MessageListenerCallback, we need to read all new messages from zk.
if (messages == null || messages.isEmpty()) {
// If no messages are given, check and read all new messages.
messages = readNewMessagesFromZK(manager, instanceName, changeContext.getChangeType());
}
if (_isShuttingDown) {
StringBuilder sb = new StringBuilder();
for (Message message : messages) {
sb.append(message.getMsgId() + ",");
}
LOG.info(
"Helix task executor is shutting down, ignore unprocessed messages : " + sb.toString());
return;
}
// Update message count
if (_messageQueueMonitor != null) {
_messageQueueMonitor.setMessageQueueBacklog(messages.size());
}
if (messages.isEmpty()) {
LOG.info("No Messages to process");
return;
}
// sort message by creation timestamp, so message created earlier is processed first
Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
HelixDataAccessor accessor = manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
// message handlers and corresponding contexts created
Map<String, MessageHandler> stateTransitionHandlers = new HashMap<>();
Map<String, NotificationContext> stateTransitionContexts = new HashMap<>();
List<MessageHandler> nonStateTransitionHandlers = new ArrayList<>();
List<NotificationContext> nonStateTransitionContexts = new ArrayList<>();
// message to be updated in ZK
Map<String, Message> msgsToBeUpdated = new HashMap<>();
String sessionId = manager.getSessionId();
List<String> curResourceNames =
accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
List<String> taskCurResourceNames =
accessor.getChildNames(keyBuilder.taskCurrentStates(instanceName, sessionId));
List<PropertyKey> createCurStateKeys = new ArrayList<>();
List<CurrentState> metaCurStates = new ArrayList<>();
Set<String> createCurStateNames = new HashSet<>();
for (Message message : messages) {
if (checkAndProcessNoOpMessage(message, instanceName, changeContext, manager, sessionId,
stateTransitionHandlers)) {
// skip the following operations for the no-op messages.
continue;
}
NotificationContext msgWorkingContext = changeContext.clone();
MessageHandler msgHandler = null;
try {
// create message handlers, if handlers not found but no exception, leave its state as NEW
msgHandler = createMessageHandler(message, msgWorkingContext);
} catch (Exception ex) {
// Failed to create message handler and there is an Exception.
int remainingRetryCount = message.getRetryCount();
LOG.error(
"Exception happens when creating Message Handler for message {}. Current remaining retry count is {}.",
message.getMsgId(), remainingRetryCount);
// Reduce the message retry count to avoid infinite retrying.
message.setRetryCount(remainingRetryCount - 1);
message.setExecuteSessionId(sessionId);
// Note that we are re-using the retry count of Message that was original designed to control
// timeout retries. So it is not checked before the first try in order to ensure consistent
// behavior. It is possible that we introduce a new behavior for this method. But it requires
// us to split the configuration item so as to avoid confusion.
if (message.getRetryCount() <= 0) {
// If no more retry count remains, then mark the message to be UNPROCESSABLE.
String errorMsg = String.format("No available message Handler found!"
+ " Stop processing message %s since it has zero or negative remaining retry count %d!",
message.getMsgId(), message.getRetryCount());
updateUnprocessableMessage(message, null, errorMsg, manager);
}
msgsToBeUpdated.put(message.getId(), message);
// continue processing in the next section where handler object is double-checked.
}
if (msgHandler == null) {
// Skip processing this message in this callback. The same message process will be retried
// in the next round if retry count > 0.
LOG.warn("There is no existing handler for message {}."
+ " Skip processing it for now. Will retry on the next callback.", message.getMsgId());
continue;
}
if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) || message.getMsgType()
.equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
if (validateAndProcessStateTransitionMessage(message, manager, stateTransitionHandlers,
msgHandler)) {
// Need future process by triggering state transition
String msgTarget =
getMessageTarget(message.getResourceName(), message.getPartitionName());
stateTransitionHandlers.put(msgTarget, msgHandler);
stateTransitionContexts.put(msgTarget, msgWorkingContext);
} else {
// Skip the following operations for the invalid/expired state transition messages.
// Also remove the message since it might block the other state transition messages.
removeMessageFromZK(accessor, message, instanceName);
continue;
}
} else {
// Need future process non state transition messages by triggering the handler
nonStateTransitionHandlers.add(msgHandler);
nonStateTransitionContexts.add(msgWorkingContext);
}
// Update the normally processed messages
Message markedMsg = markReadMessage(message, msgWorkingContext, manager);
msgsToBeUpdated.put(markedMsg.getId(), markedMsg);
// batch creation of all current state meta data
// do it for non-controller and state transition messages only
if (!message.isControlerMsg() && message.getMsgType()
.equals(Message.MessageType.STATE_TRANSITION.name())) {
String resourceName = message.getResourceName();
if (!curResourceNames.contains(resourceName) && !taskCurResourceNames.contains(resourceName)
&& !createCurStateNames.contains(resourceName)) {
createCurStateNames.add(resourceName);
PropertyKey curStateKey = keyBuilder.currentState(instanceName, sessionId, resourceName);
if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) && !Boolean
.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) {
curStateKey = keyBuilder.taskCurrentState(instanceName, sessionId, resourceName);
}
createCurStateKeys.add(curStateKey);
CurrentState metaCurState = new CurrentState(resourceName);
metaCurState.setBucketSize(message.getBucketSize());
metaCurState.setStateModelDefRef(message.getStateModelDef());
metaCurState.setSessionId(sessionId);
metaCurState.setBatchMessageMode(message.getBatchMessageMode());
String ftyName = message.getStateModelFactoryName();
if (ftyName != null) {
metaCurState.setStateModelFactoryName(ftyName);
} else {
metaCurState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
}
metaCurStates.add(metaCurState);
}
}
}
// batch create curState meta
if (createCurStateKeys.size() > 0) {
try {
accessor.createChildren(createCurStateKeys, metaCurStates);
} catch (Exception e) {
LOG.error("fail to create cur-state znodes for messages: " + msgsToBeUpdated, e);
}
}
// update message state in batch and schedule tasks for all read messages
updateMessageState(msgsToBeUpdated.values(), accessor, instanceName);
for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
MessageHandler handler = handlerEntry.getValue();
NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
if (!scheduleTaskForMessage(instanceName, accessor, handler, context) && !_isShuttingDown) {
/**
* TODO: Checking _isShuttingDown is a workaround to avoid unnecessary ERROR partition.
* TODO: We shall improve the shutdown process of the participant to clean up the workflow
* TODO: completely. In detail, there isa race condition between TaskExecutor thread
* TODO: pool shutting down and Message handler stops listening. In this gap, the message
* TODO: will still be processed but schedule will fail. If we mark partition into ERROR
* TODO: state, then the controller side logic might be confused.
*/
try {
// Record error state to the message handler.
handler.onError(new HelixException(String
.format("Failed to schedule the task for executing message handler for %s.",
handler._message.getMsgId())), MessageHandler.ErrorCode.ERROR,
MessageHandler.ErrorType.FRAMEWORK);
} catch (Exception ex) {
LOG.error("Failed to trigger onError method of the message handler for {}",
handler._message.getMsgId(), ex);
}
}
}
for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
MessageHandler handler = nonStateTransitionHandlers.get(i);
NotificationContext context = nonStateTransitionContexts.get(i);
scheduleTaskForMessage(instanceName, accessor, handler, context);
}
}