public void onMessage()

in helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java [851:1061]


  public void onMessage(String instanceName, List<Message> messages,
      NotificationContext changeContext) {
    HelixManager manager = changeContext.getManager();

    // If FINALIZE notification comes, reset all handler factories
    // and terminate all the thread pools
    // TODO: see if we should have a separate notification call for resetting
    if (changeContext.getType() == Type.FINALIZE) {
      reset();
      return;
    }

    if (changeContext.getType() == Type.INIT) {
      init();
      // continue to process messages
    }
    _isCleanState = false;

    // if prefetch is disabled in MessageListenerCallback, we need to read all new messages from zk.
    if (messages == null || messages.isEmpty()) {
      // If no messages are given, check and read all new messages.
      messages = readNewMessagesFromZK(manager, instanceName, changeContext.getChangeType());
    }

    if (_isShuttingDown) {
      StringBuilder sb = new StringBuilder();
      for (Message message : messages) {
        sb.append(message.getMsgId() + ",");
      }
      LOG.info(
          "Helix task executor is shutting down, ignore unprocessed messages : " + sb.toString());
      return;
    }

    // Update message count
    if (_messageQueueMonitor != null) {
      _messageQueueMonitor.setMessageQueueBacklog(messages.size());
    }

    if (messages.isEmpty()) {
      LOG.info("No Messages to process");
      return;
    }

    // sort message by creation timestamp, so message created earlier is processed first
    Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);

    HelixDataAccessor accessor = manager.getHelixDataAccessor();
    Builder keyBuilder = accessor.keyBuilder();

    // message handlers and corresponding contexts created
    Map<String, MessageHandler> stateTransitionHandlers = new HashMap<>();
    Map<String, NotificationContext> stateTransitionContexts = new HashMap<>();

    List<MessageHandler> nonStateTransitionHandlers = new ArrayList<>();
    List<NotificationContext> nonStateTransitionContexts = new ArrayList<>();

    // message to be updated in ZK
    Map<String, Message> msgsToBeUpdated = new HashMap<>();

    String sessionId = manager.getSessionId();
    List<String> curResourceNames =
        accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
    List<String> taskCurResourceNames =
        accessor.getChildNames(keyBuilder.taskCurrentStates(instanceName, sessionId));
    List<PropertyKey> createCurStateKeys = new ArrayList<>();
    List<CurrentState> metaCurStates = new ArrayList<>();
    Set<String> createCurStateNames = new HashSet<>();

    for (Message message : messages) {
      if (checkAndProcessNoOpMessage(message, instanceName, changeContext, manager, sessionId,
          stateTransitionHandlers)) {
        // skip the following operations for the no-op messages.
        continue;
      }
      NotificationContext msgWorkingContext = changeContext.clone();
      MessageHandler msgHandler = null;
      try {
        // create message handlers, if handlers not found but no exception, leave its state as NEW
        msgHandler = createMessageHandler(message, msgWorkingContext);
      } catch (Exception ex) {
        // Failed to create message handler and there is an Exception.
        int remainingRetryCount = message.getRetryCount();
        LOG.error(
            "Exception happens when creating Message Handler for message {}. Current remaining retry count is {}.",
            message.getMsgId(), remainingRetryCount);
        // Reduce the message retry count to avoid infinite retrying.
        message.setRetryCount(remainingRetryCount - 1);
        message.setExecuteSessionId(sessionId);
        // Note that we are re-using the retry count of Message that was original designed to control
        // timeout retries. So it is not checked before the first try in order to ensure consistent
        // behavior. It is possible that we introduce a new behavior for this method. But it requires
        // us to split the configuration item so as to avoid confusion.
        if (message.getRetryCount() <= 0) {
          // If no more retry count remains, then mark the message to be UNPROCESSABLE.
          String errorMsg = String.format("No available message Handler found!"
                  + " Stop processing message %s since it has zero or negative remaining retry count %d!",
              message.getMsgId(), message.getRetryCount());
          updateUnprocessableMessage(message, null, errorMsg, manager);
        }
        msgsToBeUpdated.put(message.getId(), message);
        // continue processing in the next section where handler object is double-checked.
      }

      if (msgHandler == null) {
        // Skip processing this message in this callback. The same message process will be retried
        // in the next round if retry count > 0.
        LOG.warn("There is no existing handler for message {}."
            + " Skip processing it for now. Will retry on the next callback.", message.getMsgId());
        continue;
      }

      if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) || message.getMsgType()
          .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
        if (validateAndProcessStateTransitionMessage(message, manager, stateTransitionHandlers,
            msgHandler)) {
          // Need future process by triggering state transition
          String msgTarget =
              getMessageTarget(message.getResourceName(), message.getPartitionName());
          stateTransitionHandlers.put(msgTarget, msgHandler);
          stateTransitionContexts.put(msgTarget, msgWorkingContext);
        } else {
          // Skip the following operations for the invalid/expired state transition messages.
          // Also remove the message since it might block the other state transition messages.
          removeMessageFromZK(accessor, message, instanceName);
          continue;
        }
      } else {
        // Need future process non state transition messages by triggering the handler
        nonStateTransitionHandlers.add(msgHandler);
        nonStateTransitionContexts.add(msgWorkingContext);
      }

      // Update the normally processed messages
      Message markedMsg = markReadMessage(message, msgWorkingContext, manager);
      msgsToBeUpdated.put(markedMsg.getId(), markedMsg);

      // batch creation of all current state meta data
      // do it for non-controller and state transition messages only
      if (!message.isControlerMsg() && message.getMsgType()
          .equals(Message.MessageType.STATE_TRANSITION.name())) {
        String resourceName = message.getResourceName();
        if (!curResourceNames.contains(resourceName) && !taskCurResourceNames.contains(resourceName)
            && !createCurStateNames.contains(resourceName)) {
          createCurStateNames.add(resourceName);
          PropertyKey curStateKey = keyBuilder.currentState(instanceName, sessionId, resourceName);
          if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) && !Boolean
              .getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) {
            curStateKey = keyBuilder.taskCurrentState(instanceName, sessionId, resourceName);
          }
          createCurStateKeys.add(curStateKey);

          CurrentState metaCurState = new CurrentState(resourceName);
          metaCurState.setBucketSize(message.getBucketSize());
          metaCurState.setStateModelDefRef(message.getStateModelDef());
          metaCurState.setSessionId(sessionId);
          metaCurState.setBatchMessageMode(message.getBatchMessageMode());
          String ftyName = message.getStateModelFactoryName();
          if (ftyName != null) {
            metaCurState.setStateModelFactoryName(ftyName);
          } else {
            metaCurState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
          }
          metaCurStates.add(metaCurState);
        }
      }
    }

    // batch create curState meta
    if (createCurStateKeys.size() > 0) {
      try {
        accessor.createChildren(createCurStateKeys, metaCurStates);
      } catch (Exception e) {
        LOG.error("fail to create cur-state znodes for messages: " + msgsToBeUpdated, e);
      }
    }

    // update message state in batch and schedule tasks for all read messages
    updateMessageState(msgsToBeUpdated.values(), accessor, instanceName);

    for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
      MessageHandler handler = handlerEntry.getValue();
      NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
      if (!scheduleTaskForMessage(instanceName, accessor, handler, context) && !_isShuttingDown) {
        /**
         * TODO: Checking _isShuttingDown is a workaround to avoid unnecessary ERROR partition.
         * TODO: We shall improve the shutdown process of the participant to clean up the workflow
         * TODO: completely. In detail, there isa race condition between TaskExecutor thread
         * TODO: pool shutting down and Message handler stops listening. In this gap, the message
         * TODO: will still be processed but schedule will fail. If we mark partition into ERROR
         * TODO: state, then the controller side logic might be confused.
         */
        try {
          // Record error state to the message handler.
          handler.onError(new HelixException(String
                  .format("Failed to schedule the task for executing message handler for %s.",
                      handler._message.getMsgId())), MessageHandler.ErrorCode.ERROR,
              MessageHandler.ErrorType.FRAMEWORK);
        } catch (Exception ex) {
          LOG.error("Failed to trigger onError method of the message handler for {}",
              handler._message.getMsgId(), ex);
        }
      }
    }

    for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
      MessageHandler handler = nonStateTransitionHandlers.get(i);
      NotificationContext context = nonStateTransitionContexts.get(i);
      scheduleTaskForMessage(instanceName, accessor, handler, context);
    }
  }