in core/src/main/java/com/alibaba/smart/framework/engine/bpmn/behavior/gateway/ParallelGatewayBehavior.java [75:124]
private boolean processDefaultLogic(ExecutionContext context, PvmActivity pvmActivity, ParallelGateway parallelGateway) {
Map<String, PvmTransition> incomeTransitions = pvmActivity.getIncomeTransitions();
int inComeTransitionSize = incomeTransitions.size();
if (CommonGatewayHelper.isForkGateway(pvmActivity)) {
//fork
super.enter(context, pvmActivity);
// TUNE 这里不太优雅,本来应该在execute方法中返回false的,但是execute的返回值是void,大意了. 暂时先不改了,否则很可能影响现有的用户
// 此外,目前这个类绕过了execute和leave的执行,后面有机会在优化 (并行网关这个类很特殊,既承担了fork又承担了join职责)
context.getExecutionInstance().setActive(false);
fireEvent(context,pvmActivity, EventConstant.ACTIVITY_START);
Collection<PvmTransition> values = pvmActivity.getOutcomeTransitions().values();
CommonGatewayHelper.leaveAndConcurrentlyForkIfNeeded(context, pvmActivity,values);
} else if (CommonGatewayHelper.isJoinGateway(pvmActivity)) {
ProcessInstance processInstance = context.getProcessInstance();
//这个同步很关键,避免多线程同时进入临界区,然后在下面的逻辑里去创建新的 join ei,然后和countOfTheJoinLatch 进行比较
synchronized (processInstance){
super.enter(context, pvmActivity);
Collection<PvmTransition> inComingPvmTransitions = incomeTransitions.values();
//当前内存中的,新产生的 active ExecutionInstance
//当前持久化介质中中,已产生的 active ExecutionInstance。
List<ExecutionInstance> activeExecutionList = executionInstanceStorage.findActiveExecution(processInstance.getInstanceId(), super.processEngineConfiguration);
int countOfTheJoinLatch = inComingPvmTransitions.size();
//Merge 数据库中和内存中的EI。如果是 custom模式,则可能会存在重复记录(因为custom也是从内存中查询),所以这里需要去重。 如果是 DataBase 模式,则不会有重复的EI.
return super.doa(context, parallelGateway, processInstance, activeExecutionList, countOfTheJoinLatch, null);
}
}else{
throw new EngineException("Unexpected behavior: "+pvmActivity);
}
return true;
}