in core/src/main/java/com/alibaba/smart/framework/engine/behavior/base/AbstractActivityBehavior.java [165:220]
protected boolean doa(ExecutionContext context, AbstractGateway gateway, ProcessInstance processInstance, List<ExecutionInstance> activeExecutionList, int countOfTheJoinLatch, ExecutionInstance forkedExecutionInstanceOfInclusiveGateway) {
List<ExecutionInstance> executionInstanceListFromMemory = InstanceUtil.findActiveExecution(processInstance);
List<ExecutionInstance> mergedExecutionInstanceList = new ArrayList<ExecutionInstance>();
mergedExecutionInstanceList.addAll(executionInstanceListFromMemory);
for (ExecutionInstance executionInstance : activeExecutionList) {
if(mergedExecutionInstanceList.contains(executionInstance)){
//do nothing
}else {
mergedExecutionInstanceList.add(executionInstance);
}
}
int reachedJoinCounter = 0;
List<ExecutionInstance> chosenExecutionInstanceList = new ArrayList<ExecutionInstance>(executionInstanceListFromMemory.size());
if(null != mergedExecutionInstanceList){
for (ExecutionInstance executionInstance : mergedExecutionInstanceList) {
if (executionInstance.getProcessDefinitionActivityId().equals(gateway.getId())) {
reachedJoinCounter++;
chosenExecutionInstanceList.add(executionInstance);
}
}
}
LOGGER.debug("chosenExecutionInstanceList , reachedJoinCounter,countOfTheJoinLatch is {} , {} , {} ",chosenExecutionInstanceList,reachedJoinCounter, countOfTheJoinLatch);
if(reachedJoinCounter > countOfTheJoinLatch){
throw new EngineException("Unexpected behavior,reachedJoinCounter: " + reachedJoinCounter +",countOfTheJoinLatch: "+ countOfTheJoinLatch);
}
else if(reachedJoinCounter == countOfTheJoinLatch){
//把当前停留在join节点的执行实例全部complete掉,然后再持久化时,会自动忽略掉这些节点。
if(null != chosenExecutionInstanceList){
for (ExecutionInstance executionInstance : chosenExecutionInstanceList) {
MarkDoneUtil.markDoneExecutionInstance(executionInstance,executionInstanceStorage,
processEngineConfiguration);
}
}
// 虽然 fork-join 对已经结束,但是 unbalanced 这种场景还需要 blockId 去计算
// context.setBlockId(null);
hookCleanUp(context, forkedExecutionInstanceOfInclusiveGateway);
return false;
}else{
//未完成的话,流程继续暂停
return true;
}
}