in helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java [199:297]
public MessageHandler createHandler(Message message, NotificationContext context) {
String type = message.getMsgType();
if (!type.equals(MessageType.STATE_TRANSITION.name()) && !type
.equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
throw new HelixException("Expect state-transition message type, but was "
+ message.getMsgType() + ", msgId: " + message.getMsgId());
}
String partitionKey = message.getPartitionName();
String stateModelName = message.getStateModelDef();
String resourceName = message.getResourceName();
String sessionId = message.getTgtSessionId();
int bucketSize = message.getBucketSize();
if (stateModelName == null) {
logger
.error("Fail to create msg-handler because message does not contain stateModelDef. msgId: "
+ message.getId());
return null;
}
String factoryName = message.getStateModelFactoryName();
if (factoryName == null) {
factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
}
StateModelFactory<? extends StateModel> stateModelFactory =
getStateModelFactory(stateModelName, factoryName);
if (stateModelFactory == null) {
logger.warn("Fail to create msg-handler because cannot find stateModelFactory for model: "
+ stateModelName + " using factoryName: " + factoryName + " for resource: "
+ resourceName);
return null;
}
// check if the state model definition exists and cache it
if (!_stateModelDefs.containsKey(stateModelName)) {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
StateModelDefinition stateModelDef =
accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
if (stateModelDef == null) {
throw new HelixException("fail to create msg-handler because stateModelDef for "
+ stateModelName + " does NOT exist");
}
_stateModelDefs.put(stateModelName, stateModelDef);
}
if (!message.getBatchMessageMode()) {
String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
if (stateModel == null) {
stateModel = stateModelFactory.createAndAddStateModel(resourceName, partitionKey);
if (stateModelName.equals(TaskConstants.STATE_MODEL_NAME)
&& message.getToState().equals(TaskPartitionState.DROPPED.name())) {
// If stateModel is null, that means there was a reboot of the Participant. Then the
// purpose of this first message must be to drop the task. We manually set the current
// state to be the same state of fromState (which Controller inferred from JobContext) to
// allow the Participant to successfully process this dropping transition
stateModel.updateState(message.getFromState());
} else {
stateModel.updateState(initState);
}
}
if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
return new HelixStateTransitionCancellationHandler(stateModel, message, context);
} else {
// create currentStateDelta for this partition
// TODO: move currentStateDelta to StateTransitionMsgHandler
CurrentState currentStateDelta = new CurrentState(resourceName);
currentStateDelta.setSessionId(sessionId);
currentStateDelta.setStateModelDefRef(stateModelName);
currentStateDelta.setStateModelFactoryName(factoryName);
currentStateDelta.setBucketSize(bucketSize);
currentStateDelta.setState(partitionKey,
(stateModel.getCurrentState() == null) ? initState : stateModel.getCurrentState());
return new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context,
currentStateDelta);
}
} else {
BatchMessageWrapper wrapper = stateModelFactory.getBatchMessageWrapper(resourceName);
if (wrapper == null) {
wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceName);
}
// get executor-service for the message
TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString());
if (executor == null) {
logger.error(
"fail to get executor-service for batch message: " + message.getId() + ". msgType: "
+ message.getMsgType() + ", resource: " + message.getResourceName());
return null;
}
return new BatchMessageHandler(message, context, this, wrapper, executor);
}
}