in core/src/main/java/com/alibaba/smart/framework/engine/configuration/impl/DefaultParallelServiceOrchestration.java [38:94]
public void orchestrateService(ExecutionContext context, PvmActivity pvmActivity) {
{
Map<String, PvmTransition> incomeTransitionMap = pvmActivity.getIncomeTransitions();
Map<String, PvmTransition> outcomeTransitionMap = pvmActivity.getOutcomeTransitions();
int outComeTransitionSize = outcomeTransitionMap.size();
int inComeTransitionSize = incomeTransitionMap.size();
//fork
if (outComeTransitionSize >= 2 && inComeTransitionSize == 1) {
ProcessEngineConfiguration processEngineConfiguration = context.getProcessEngineConfiguration();
AnnotationScanner annotationScanner = processEngineConfiguration.getAnnotationScanner();
ContextFactory contextFactory = annotationScanner.getExtensionPoint(ExtensionConstant.COMMON, ContextFactory.class);
Map<String, String> properties = pvmActivity.getModel().getProperties();
Set<Entry<String, PvmTransition>> entries = outcomeTransitionMap.entrySet();
Long latchWaitTime = acquireLatchWaitTime(context, properties);
ParallelGatewayConstant.ExecuteStrategy executeStrategy = getExecuteStrategy(properties);
boolean isSkipTimeout = isSkipTimeout((String) MapUtil.safeGet(properties, ParallelGatewayConstant.SKIP_TIMEOUT_EXCEPTION));
// 注意: 重新赋值 如果能匹配到自定义的线程池,直接使用。 允许扩展并行网关的3种属性: timeout="300" strategy="any" poolName="poolA" skipTimeoutExp="true" 使用方法详见 ServiceOrchestrationParallelGatewayTest
ExecutorService executorService = useSpecifiedExecutorServiceIfNeeded(properties, processEngineConfiguration);
List<PvmActivityTask> pvmActivityTaskList = new ArrayList<PvmActivityTask>(outComeTransitionSize);
try {
PvmActivity finalJoinPvmActivity = initMultiTaskRequestAndFindOutJoinActivity(context, contextFactory, pvmActivityTaskList, entries);
List<Future<ExecutionContext>> futureExecutionResultList = invoke(latchWaitTime, isSkipTimeout, executeStrategy, executorService, pvmActivityTaskList);
acquireFutureResult(context, processEngineConfiguration, latchWaitTime, isSkipTimeout, futureExecutionResultList);
ActivityBehavior behavior = finalJoinPvmActivity.getBehavior();
//模拟正常流程的继续驱动,将继续推进caller thread 执行后续节点。
behavior.leave(context, finalJoinPvmActivity);
} catch (Exception e) {
throw new EngineException(e);
}
} else if (outComeTransitionSize == 1 && inComeTransitionSize >= 2) {
//join
// 在服务编排场景,仅是子线程在执行到最后一个节点后,会进入到并行网关的join节点。CallerThread 不会执行到这里的逻辑。
// TUNE 目前重构下来,感觉session 里去setPvmActivity,没啥意义了. 直接根据流程定义中,计算出join节点即可.
// GatewaySticker.currentSession().setPvmActivity(pvmActivity);
} else {
throw new EngineException("Unexpected behavior: " + pvmActivity);
}
}
}