public void orchestrateService()

in core/src/main/java/com/alibaba/smart/framework/engine/configuration/impl/DefaultParallelServiceOrchestration.java [38:94]


    public void orchestrateService(ExecutionContext context, PvmActivity pvmActivity) {
        {

            Map<String, PvmTransition> incomeTransitionMap = pvmActivity.getIncomeTransitions();
            Map<String, PvmTransition> outcomeTransitionMap = pvmActivity.getOutcomeTransitions();

            int outComeTransitionSize = outcomeTransitionMap.size();
            int inComeTransitionSize = incomeTransitionMap.size();

            //fork
            if (outComeTransitionSize >= 2 && inComeTransitionSize == 1) {

                ProcessEngineConfiguration processEngineConfiguration = context.getProcessEngineConfiguration();
                AnnotationScanner annotationScanner = processEngineConfiguration.getAnnotationScanner();
                ContextFactory contextFactory = annotationScanner.getExtensionPoint(ExtensionConstant.COMMON, ContextFactory.class);
                Map<String, String> properties = pvmActivity.getModel().getProperties();
                Set<Entry<String, PvmTransition>> entries = outcomeTransitionMap.entrySet();

                Long latchWaitTime = acquireLatchWaitTime(context, properties);
                ParallelGatewayConstant.ExecuteStrategy executeStrategy = getExecuteStrategy(properties);
                boolean isSkipTimeout = isSkipTimeout((String) MapUtil.safeGet(properties, ParallelGatewayConstant.SKIP_TIMEOUT_EXCEPTION));

                // 注意: 重新赋值 如果能匹配到自定义的线程池,直接使用。 允许扩展并行网关的3种属性: timeout="300" strategy="any" poolName="poolA" skipTimeoutExp="true"  使用方法详见  ServiceOrchestrationParallelGatewayTest
                ExecutorService executorService = useSpecifiedExecutorServiceIfNeeded(properties, processEngineConfiguration);

                List<PvmActivityTask> pvmActivityTaskList = new ArrayList<PvmActivityTask>(outComeTransitionSize);

                try {

                    PvmActivity finalJoinPvmActivity = initMultiTaskRequestAndFindOutJoinActivity(context, contextFactory, pvmActivityTaskList, entries);

                    List<Future<ExecutionContext>> futureExecutionResultList = invoke(latchWaitTime, isSkipTimeout, executeStrategy, executorService, pvmActivityTaskList);

                    acquireFutureResult(context, processEngineConfiguration, latchWaitTime, isSkipTimeout, futureExecutionResultList);

                    ActivityBehavior behavior = finalJoinPvmActivity.getBehavior();

                    //模拟正常流程的继续驱动,将继续推进caller thread 执行后续节点。
                    behavior.leave(context, finalJoinPvmActivity);

                } catch (Exception e) {
                    throw new EngineException(e);
                }


            } else if (outComeTransitionSize == 1 && inComeTransitionSize >= 2) {
                //join
                // 在服务编排场景,仅是子线程在执行到最后一个节点后,会进入到并行网关的join节点。CallerThread 不会执行到这里的逻辑。
                // TUNE 目前重构下来,感觉session 里去setPvmActivity,没啥意义了. 直接根据流程定义中,计算出join节点即可.
//                GatewaySticker.currentSession().setPvmActivity(pvmActivity);

            } else {
                throw new EngineException("Unexpected behavior: " + pvmActivity);
            }
        }

    }