private void updatePendingMessages()

in helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java [140:240]


  private void updatePendingMessages(LiveInstance instance, BaseControllerDataProvider cache,
      Collection<Message> pendingMessages, Collection<Message> pendingRelayMessages,
      Set<Message> existingStaleMessages, CurrentStateOutput currentStateOutput,
      Map<String, Resource> resourceMap) {
    String instanceName = instance.getInstanceName();
    String instanceSessionId = instance.getEphemeralOwner();

    // update all pending messages
    for (Message message : pendingMessages) {
      // ignore existing stale messages
      if (existingStaleMessages.contains(message)) {
        continue;
      }
      if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
          && !MessageType.STATE_TRANSITION_CANCELLATION.name()
          .equalsIgnoreCase(message.getMsgType())) {
        continue;
      }
      if (!instanceSessionId.equals(message.getTgtSessionId())) {
        continue;
      }
      String resourceName = message.getResourceName();
      Resource resource = resourceMap.get(resourceName);
      if (resource == null) {
        LogUtil.logDebug(LOG, _eventId, String.format(
            "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
            message.getMsgId(), resourceName, message.getPartitionName()));
        continue;
      }

      if (!message.getBatchMessageMode()) {
        String partitionName = message.getPartitionName();
        Partition partition = resource.getPartition(partitionName);
        if (partition != null) {
          String currentState = currentStateOutput.getCurrentState(resourceName, partition,
              instanceName);
          if (_isTaskFrameworkPipeline || !isStaleMessage(message, currentState)) {
            setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
          } else {
            cache.addStaleMessage(instanceName, message);
          }
        } else {
          LogUtil.logDebug(LOG, _eventId, String
              .format("Ignore a pending message %s for a non-exist resource %s and partition %s",
                  message.getMsgId(), resourceName, message.getPartitionName()));
        }
      } else {
        List<String> partitionNames = message.getPartitionNames();
        if (!partitionNames.isEmpty()) {
          for (String partitionName : partitionNames) {
            Partition partition = resource.getPartition(partitionName);
            if (partition != null) {
              setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
            } else {
              LogUtil.logDebug(LOG, _eventId, String.format(
                  "Ignore a pending message %s for a non-exist resource %s and partition %s",
                  message.getMsgId(), resourceName, message.getPartitionName()));
            }
          }
        }
      }

      // Add the state model into the map for lookup of Task Framework pending partitions
      if (resource.getStateModelDefRef() != null) {
        currentStateOutput.setResourceStateModelDef(resourceName, resource.getStateModelDefRef());
      }
    }


    // update all pending relay messages
    for (Message message : pendingRelayMessages) {
      if (!message.isRelayMessage()) {
        LogUtil.logWarn(LOG, _eventId,
            String.format("Not a relay message %s, ignored!", message.getMsgId()));
        continue;
      }
      String resourceName = message.getResourceName();
      Resource resource = resourceMap.get(resourceName);
      if (resource == null) {
        LogUtil.logDebug(LOG, _eventId, String.format(
            "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
            message.getMsgId(), resourceName, message.getPartitionName()));
        continue;
      }

      if (!message.getBatchMessageMode()) {
        String partitionName = message.getPartitionName();
        Partition partition = resource.getPartition(partitionName);
        if (partition != null) {
          currentStateOutput.setPendingRelayMessage(resourceName, partition, instanceName, message);
        } else {
          LogUtil.logDebug(LOG, _eventId, String.format(
              "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
              message.getMsgId(), resourceName, message.getPartitionName()));
        }
      } else {
        LogUtil.logWarn(LOG, _eventId,
            String.format("A relay message %s should not be batched, ignored!", message.getMsgId()));
      }
    }
  }