in saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/impl/ProcessCtrlStateMachineEngine.java [220:350]
protected StateMachineInstance forwardInternal(String stateMachineInstId, Map<String, Object> replaceParams,
boolean skip, boolean async, AsyncCallback callback)
throws EngineExecutionException {
StateMachineInstance stateMachineInstance = reloadStateMachineInstance(stateMachineInstId);
if (stateMachineInstance == null) {
throw new ForwardInvalidException("StateMachineInstance is not exits",
FrameworkErrorCode.StateMachineInstanceNotExists);
}
if (ExecutionStatus.SU.equals(stateMachineInstance.getStatus())
&& stateMachineInstance.getCompensationStatus() == null) {
return stateMachineInstance;
}
ExecutionStatus[] acceptStatus = new ExecutionStatus[] {ExecutionStatus.FA, ExecutionStatus.UN, ExecutionStatus.RU};
checkStatus(stateMachineInstance, acceptStatus, null, stateMachineInstance.getStatus(), null, "forward");
List<StateInstance> actList = stateMachineInstance.getStateList();
if (CollectionUtils.isEmpty(actList)) {
throw new ForwardInvalidException("StateMachineInstance[id:" + stateMachineInstId
+ "] has no stateInstance, please start a new StateMachine execution instead",
FrameworkErrorCode.OperationDenied);
}
StateInstance lastForwardState = findOutLastForwardStateInstance(actList);
if (lastForwardState == null) {
throw new ForwardInvalidException(
"StateMachineInstance[id:" + stateMachineInstId + "] Cannot find last forward execution stateInstance",
FrameworkErrorCode.OperationDenied);
}
ProcessContextBuilder contextBuilder = ProcessContextBuilder.create().withProcessType(ProcessType.STATE_LANG)
.withOperationName(DomainConstants.OPERATION_NAME_FORWARD).withAsyncCallback(callback)
.withStateMachineInstance(stateMachineInstance).withStateInstance(lastForwardState).withStateMachineConfig(
getStateMachineConfig()).withStateMachineEngine(this);
contextBuilder.withIsAsyncExecution(async);
ProcessContext context = contextBuilder.build();
Map<String, Object> contextVariables = getStateMachineContextVariables(stateMachineInstance);
if (replaceParams != null) {
contextVariables.putAll(replaceParams);
}
putBusinessKeyToContextVariables(stateMachineInstance, contextVariables);
ConcurrentHashMap<String, Object> concurrentContextVariables = new ConcurrentHashMap<>(contextVariables.size());
nullSafeCopy(contextVariables, concurrentContextVariables);
context.setVariable(DomainConstants.VAR_NAME_STATEMACHINE_CONTEXT, concurrentContextVariables);
stateMachineInstance.setContext(concurrentContextVariables);
String originStateName = EngineUtils.getOriginStateName(lastForwardState);
State lastState = stateMachineInstance.getStateMachine().getState(originStateName);
Loop loop = LoopTaskUtils.getLoopConfig(context, lastState);
if (null != loop && ExecutionStatus.SU.equals(lastForwardState.getStatus())) {
lastForwardState = LoopTaskUtils.findOutLastNeedForwardStateInstance(context);
}
context.setVariable(lastForwardState.getName() + DomainConstants.VAR_NAME_RETRIED_STATE_INST_ID,
lastForwardState.getId());
if (StateType.SUB_STATE_MACHINE.equals(lastForwardState.getType()) && !ExecutionStatus.SU
.equals(lastForwardState.getCompensationStatus())) {
context.setVariable(DomainConstants.VAR_NAME_IS_FOR_SUB_STATMACHINE_FORWARD, true);
}
if (!ExecutionStatus.SU.equals(lastForwardState.getStatus())) {
lastForwardState.setIgnoreStatus(true);
}
try {
StateInstruction inst = new StateInstruction();
inst.setTenantId(stateMachineInstance.getTenantId());
inst.setStateMachineName(stateMachineInstance.getStateMachine().getName());
if (skip || ExecutionStatus.SU.equals(lastForwardState.getStatus())) {
String next = null;
State state = stateMachineInstance.getStateMachine().getState(EngineUtils.getOriginStateName(lastForwardState));
if (state instanceof AbstractTaskState) {
next = state.getNext();
}
if (StringUtils.isEmpty(next)) {
LOGGER.warn(
"Last Forward execution StateInstance was succeed, and it has not Next State , skip forward "
+ "operation");
return stateMachineInstance;
}
inst.setStateName(next);
} else {
if (ExecutionStatus.RU.equals(lastForwardState.getStatus())
&& !EngineUtils.isTimeout(lastForwardState.getGmtStarted(), stateMachineConfig.getServiceInvokeTimeout())) {
throw new EngineExecutionException(
"State [" + lastForwardState.getName() + "] is running, operation[forward] denied", FrameworkErrorCode.OperationDenied);
}
inst.setStateName(EngineUtils.getOriginStateName(lastForwardState));
}
context.setInstruction(inst);
stateMachineInstance.setStatus(ExecutionStatus.RU);
stateMachineInstance.setRunning(true);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Operation [forward] started stateMachineInstance[id:" + stateMachineInstance.getId() + "]");
}
if (stateMachineInstance.getStateMachine().isPersist()) {
stateMachineConfig.getStateLogStore().recordStateMachineRestarted(stateMachineInstance, context);
}
loop = LoopTaskUtils.getLoopConfig(context, inst.getState(context));
if (null != loop) {
inst.setTemporaryState(new LoopStartStateImpl());
}
if (async) {
stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(context);
} else {
stateMachineConfig.getProcessCtrlEventPublisher().publish(context);
}
} catch (EngineExecutionException e) {
LOGGER.error("Operation [forward] failed", e);
throw e;
}
return stateMachineInstance;
}