in core/src/main/java/com/alibaba/smart/framework/engine/bpmn/behavior/gateway/helper/CommonGatewayHelper.java [169:235]
public static void leaveAndConcurrentlyForkIfNeeded(ExecutionContext context, PvmActivity pvmActivity, Collection<PvmTransition> values ) {
int outComeTransitionSize = values.size();
ExecutorService executorService = context.getProcessEngineConfiguration().getExecutorService();
ProcessEngineConfiguration processEngineConfiguration = context.getProcessEngineConfiguration();
AnnotationScanner annotationScanner = processEngineConfiguration.getAnnotationScanner();
ContextFactory contextFactory = annotationScanner.getExtensionPoint(ExtensionConstant.COMMON, ContextFactory.class);
if(null == executorService){
//顺序执行fork
for (PvmTransition value : values) {
PvmActivity target = value.getTarget();
ExecutionContext childThreadContext = contextFactory.createGatewayContext(context);
target.enter(childThreadContext);
}
}else{
//并发执行fork 算法说明
// 前置: 在流程定义解析阶段需要知道,所有网关是否配对,并且在解析期间进行校验
// 当子线程执行结束时,看下该分支是否到达了fork对应的join(考虑到嵌套), 如果所有分支都已经完成(注意事项:检查到达该fork对应的join节点,需要注意嵌套,父join找父fork,子join找子join),
// 如果在fork主线程中发现都已经完毕(每个子线程当前的最后一个节点是否为对应的join),则调用join节点的enter ; 否则调用返回,等待下一次外部的signal
Map<String, String> properties = pvmActivity.getModel().getProperties();
Long latchWaitTime = acquireLatchWaitTime(context, properties);
ParallelGatewayConstant.ExecuteStrategy executeStrategy = getExecuteStrategy(properties);
boolean isSkipTimeout = isSkipTimeout((String) MapUtil.safeGet(properties, ParallelGatewayConstant.SKIP_TIMEOUT_EXCEPTION));
// 注意: 重新赋值 如果能匹配到自定义的线程池,直接使用。 允许扩展并行网关的3种属性: timeout="300" strategy="any" poolName="poolA" skipTimeoutExp="true" 使用方法详见 ServiceOrchestrationParallelGatewayTest
executorService = useSpecifiedExecutorServiceIfNeeded(properties, processEngineConfiguration);
List<PvmActivityTask> pvmActivityTaskList = new ArrayList<PvmActivityTask>(outComeTransitionSize);
try {
initTaskList(context, contextFactory, values, pvmActivityTaskList);
List<Future<ExecutionContext>> futureExecutionResultList = invoke(latchWaitTime, isSkipTimeout, executeStrategy, executorService, pvmActivityTaskList);
List<ExecutionContext> subThreadContextList = acquireResults(context, processEngineConfiguration, latchWaitTime, futureExecutionResultList);
//这里目前看起来没啥必要了
for (ExecutionContext executionContext : subThreadContextList) {
executionContext.getExecutionInstance().getProcessDefinitionActivityId();
}
} catch (Exception e) {
throw new EngineException(e);
}
}
}