public MessageHandler createHandler()

in helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java [199:297]


  public MessageHandler createHandler(Message message, NotificationContext context) {
    String type = message.getMsgType();

    if (!type.equals(MessageType.STATE_TRANSITION.name()) && !type
        .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
      throw new HelixException("Expect state-transition message type, but was "
          + message.getMsgType() + ", msgId: " + message.getMsgId());
    }

    String partitionKey = message.getPartitionName();
    String stateModelName = message.getStateModelDef();
    String resourceName = message.getResourceName();
    String sessionId = message.getTgtSessionId();
    int bucketSize = message.getBucketSize();

    if (stateModelName == null) {
      logger
          .error("Fail to create msg-handler because message does not contain stateModelDef. msgId: "
              + message.getId());
      return null;
    }

    String factoryName = message.getStateModelFactoryName();
    if (factoryName == null) {
      factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
    }

    StateModelFactory<? extends StateModel> stateModelFactory =
        getStateModelFactory(stateModelName, factoryName);
    if (stateModelFactory == null) {
      logger.warn("Fail to create msg-handler because cannot find stateModelFactory for model: "
          + stateModelName + " using factoryName: " + factoryName + " for resource: "
          + resourceName);
      return null;
    }

    // check if the state model definition exists and cache it
    if (!_stateModelDefs.containsKey(stateModelName)) {
      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
      Builder keyBuilder = accessor.keyBuilder();
      StateModelDefinition stateModelDef =
          accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
      if (stateModelDef == null) {
        throw new HelixException("fail to create msg-handler because stateModelDef for "
            + stateModelName + " does NOT exist");
      }
      _stateModelDefs.put(stateModelName, stateModelDef);
    }

    if (!message.getBatchMessageMode()) {
      String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
      StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
      if (stateModel == null) {
        stateModel = stateModelFactory.createAndAddStateModel(resourceName, partitionKey);
        if (stateModelName.equals(TaskConstants.STATE_MODEL_NAME)
            && message.getToState().equals(TaskPartitionState.DROPPED.name())) {
          // If stateModel is null, that means there was a reboot of the Participant. Then the
          // purpose of this first message must be to drop the task. We manually set the current
          // state to be the same state of fromState (which Controller inferred from JobContext) to
          // allow the Participant to successfully process this dropping transition
          stateModel.updateState(message.getFromState());
        } else {
          stateModel.updateState(initState);
        }
      }
      if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
        return new HelixStateTransitionCancellationHandler(stateModel, message, context);
      } else {
        // create currentStateDelta for this partition
        // TODO: move currentStateDelta to StateTransitionMsgHandler
        CurrentState currentStateDelta = new CurrentState(resourceName);
        currentStateDelta.setSessionId(sessionId);
        currentStateDelta.setStateModelDefRef(stateModelName);
        currentStateDelta.setStateModelFactoryName(factoryName);
        currentStateDelta.setBucketSize(bucketSize);

        currentStateDelta.setState(partitionKey,
            (stateModel.getCurrentState() == null) ? initState : stateModel.getCurrentState());

        return new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context,
            currentStateDelta);
      }
    } else {
      BatchMessageWrapper wrapper = stateModelFactory.getBatchMessageWrapper(resourceName);
      if (wrapper == null) {
        wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceName);
      }

      // get executor-service for the message
      TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString());
      if (executor == null) {
        logger.error(
            "fail to get executor-service for batch message: " + message.getId() + ". msgType: "
                + message.getMsgType() + ", resource: " + message.getResourceName());
        return null;
      }
      return new BatchMessageHandler(message, context, this, wrapper, executor);
    }
  }