private void generateMessage()

in helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java [119:291]


  private void generateMessage(final Resource resource, final BaseControllerDataProvider cache,
      final ResourcesStateMap resourcesStateMap, final CurrentStateOutput currentStateOutput,
      final HelixManager manager, final Map<String, String> sessionIdMap,
      final ClusterEventType eventType, MessageOutput output,
      Map<String, Map<String, Message>> messagesToCleanUp) {
    String resourceName = resource.getResourceName();

    StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
    if (stateModelDef == null) {
      LogUtil.logError(logger, _eventId,
          "State Model Definition null, skip generating messages for resource: " + resourceName);
      return;
    }

    for (Partition partition : resource.getPartitions()) {
      Map<String, String> instanceStateMap =
          new HashMap<>(resourcesStateMap.getInstanceStateMap(resourceName, partition));
      Map<String, String> pendingStateMap =
          currentStateOutput.getPendingStateMap(resourceName, partition);
      Map<String, String> currentStateMap =
          currentStateOutput.getCurrentStateMap(resourceName, partition);

      // The operation is combing pending state with best possible state. Since some replicas have
      // been moved from one instance to another, the instance will exist in pending state but not
      // best possible. Thus Helix need to check the pending state and cancel it.

      for (String instance : pendingStateMap.keySet()) {
        if (!instanceStateMap.containsKey(instance)) {
          instanceStateMap.put(instance, NO_DESIRED_STATE);
        }
      }

      // Look through the current state map and add DROPPED message if the instance is not in the
      // resourceStateMap. This instance may not have had been dropped by the rebalance strategy.
      // This check is required to ensure that the instances removed from the ideal state stateMap
      // are properly dropped.
      for (String instance : currentStateMap.keySet()) {
        if (!instanceStateMap.containsKey(instance)) {
          instanceStateMap.put(instance, HelixDefinedState.DROPPED.name());
        }
      }

      // we should generate message based on the desired-state priority
      // so keep generated messages in a temp map keyed by state
      // desired-state->list of generated-messages
      Map<String, List<Message>> messageMap = new HashMap<>();

      for (String instanceName : instanceStateMap.keySet()) {

        Set<Message> staleMessages = cache.getStaleMessagesByInstance(instanceName);

        String desiredState = instanceStateMap.get(instanceName);

        String currentState =
            currentStateOutput.getCurrentState(resourceName, partition, instanceName);
        Message pendingMessage =
            currentStateOutput.getPendingMessage(resourceName, partition, instanceName);
        boolean isCancellationEnabled = cache.getClusterConfig().isStateTransitionCancelEnabled();
        Message cancellationMessage =
            currentStateOutput.getCancellationMessage(resourceName, partition, instanceName);
        String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);

        Message message = null;

        if (currentState == null) {
          currentState = stateModelDef.getInitialState();
          nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);

          if (desiredState.equals(HelixDefinedState.DROPPED.name())) {
            LogUtil.logDebug(logger, _eventId, String
                .format("No current state for partition %s in resource %s, skip the drop message",
                    partition.getPartitionName(), resourceName));

            message =
                generateCancellationMessageForPendingMessage(desiredState, currentState, nextState,
                    pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
                    stateModelDef, cancellationMessage, isCancellationEnabled);
            addGeneratedMessageToMap(message, messageMap, eventType, cache, desiredState,
                resourceName, partition, currentState, nextState);

            // TODO: separate logic of resource/task message generation
            if (cache instanceof ResourceControllerDataProvider) {
              ((ResourceControllerDataProvider) cache)
                  .invalidateCachedIdealStateMapping(resourceName);
            }
            continue;
          }
        }

        if (shouldCleanUpPendingMessage(pendingMessage, sessionIdMap, instanceName, currentState,
            currentStateOutput.getEndTime(resourceName, partition, instanceName))) {
          logAndAddToCleanUp(messagesToCleanUp, pendingMessage, instanceName, resourceName,
              partition, currentState, PENDING_MESSAGE);
        }

        for (Message staleMessage : staleMessages) {
          // staleMessage can be simple or batch mode
          if ((System.currentTimeMillis() - currentStateOutput
              .getEndTime(resourceName, partition, instanceName) > DEFAULT_OBSELETE_MSG_PURGE_DELAY)
              && staleMessage.getResourceName().equals(resourceName) && sessionIdMap
              .containsKey(instanceName) && (
              staleMessage.getPartitionName().equals(partition.getPartitionName()) || (
                  staleMessage.getBatchMessageMode() && staleMessage.getPartitionNames()
                      .contains(partition.getPartitionName())))) {
            logAndAddToCleanUp(messagesToCleanUp, staleMessage, instanceName, resourceName,
                partition, currentState, STALE_MESSAGE);
          }
        }

        if (desiredState.equals(NO_DESIRED_STATE) || desiredState.equalsIgnoreCase(currentState)) {
          if (shouldCreateSTCancellation(pendingMessage, desiredState,
              stateModelDef.getInitialState())) {
            message = MessageUtil
                .createStateTransitionCancellationMessage(manager.getInstanceName(),
                    manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
                    sessionIdMap.get(instanceName), stateModelDef.getId(),
                    pendingMessage.getFromState(), pendingMessage.getToState(), null,
                    cancellationMessage, isCancellationEnabled, currentState);
          }
        } else {
          if (nextState == null) {
            LogUtil.logError(logger, _eventId,
                "Unable to find a next state for resource: " + resource.getResourceName()
                    + " partition: " + partition.getPartitionName() + " from stateModelDefinition"
                    + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
            continue;
          }

          if (pendingMessage != null) {
            message =
                generateCancellationMessageForPendingMessage(desiredState, currentState, nextState,
                    pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
                    stateModelDef, cancellationMessage, isCancellationEnabled);
          } else {
            // Create new state transition message
            message = MessageUtil
                .createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(),
                    resource, partition.getPartitionName(), instanceName, currentState, nextState,
                    sessionIdMap.get(instanceName), stateModelDef.getId());

            if (logger.isDebugEnabled()) {
              LogUtil.logDebug(logger, _eventId, String.format(
                  "Resource %s partition %s for instance %s with currentState %s and nextState %s",
                  resource.getResourceName(), partition.getPartitionName(), instanceName,
                  currentState, nextState));
            }
          }
        }
        addGeneratedMessageToMap(message, messageMap, eventType, cache, desiredState, resourceName,
            partition, currentState, nextState);
      }

      // add generated messages to output according to state priority
      List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
      for (String state : statesPriorityList) {
        if (messageMap.containsKey(state)) {
          for (Message message : messageMap.get(state)) {
            // This is for a bug where a message's target session id is null
            if (!message.isValid()) {
              LogUtil.logError(logger, _eventId, String.format(
                  "An invalid message was generated! Discarding this message. sessionIdMap: %s, CurrentStateMap: %s, InstanceStateMap: %s, AllInstances: %s, LiveInstances: %s, Message: %s",
                  sessionIdMap, currentStateOutput.getCurrentStateMap(resourceName, partition),
                  instanceStateMap, cache.getAllInstances(),
                  cache.getLiveInstances().keySet(),
                  message));
              continue; // Do not add this message
            }
            output.addMessage(resourceName, partition, message);
          }
        }
      }
    } // end of for-each-partition
  }