protected int createComplementCommandList()

in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java [466:560]


    protected int createComplementCommandList(Long triggerCode, String scheduleTimeParam, RunMode runMode,
                                              Command command,
                                              Integer expectedParallelismNumber,
                                              ComplementDependentMode complementDependentMode,
                                              boolean allLevelDependent,
                                              ExecutionOrder executionOrder) throws CronParseException {
        int createCount = 0;
        int dependentWorkflowDefinitionCreateCount = 0;
        runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
        Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
        Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);

        if (Objects.isNull(executionOrder)) {
            executionOrder = ExecutionOrder.DESC_ORDER;
        }

        List<Schedule> schedules = processService.queryReleaseSchedulerListByWorkflowDefinitionCode(
                command.getWorkflowDefinitionCode());

        List<ZonedDateTime> listDate = new ArrayList<>();
        if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey(
                CMD_PARAM_COMPLEMENT_DATA_END_DATE)) {
            String startDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE);
            String endDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE);
            if (startDate != null && endDate != null) {
                listDate = CronUtils.getSelfFireDateList(
                        DateUtils.stringToZoneDateTime(startDate),
                        DateUtils.stringToZoneDateTime(endDate),
                        schedules);
            }
        }

        if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
            String dateList = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);

            if (StringUtils.isNotBlank(dateList)) {
                listDate = Splitter.on(COMMA).splitToStream(dateList)
                        .map(item -> DateUtils.stringToZoneDateTime(item.trim()))
                        .distinct()
                        .collect(Collectors.toList());
            }
        }

        if (CollectionUtils.isEmpty(listDate)) {
            throw new ServiceException(Status.TASK_COMPLEMENT_DATA_DATE_ERROR);
        }

        if (executionOrder.equals(ExecutionOrder.DESC_ORDER)) {
            Collections.sort(listDate, Collections.reverseOrder());
        } else {
            Collections.sort(listDate);
        }

        switch (runMode) {
            case RUN_MODE_SERIAL: {
                log.info("RunMode of {} command is serial run, workflowDefinitionCode:{}.",
                        command.getCommandType().getDescp(), command.getWorkflowDefinitionCode());
                createCount = createComplementCommand(triggerCode, command, cmdParam, listDate, schedules,
                        complementDependentMode, allLevelDependent);
                break;
            }
            case RUN_MODE_PARALLEL: {
                log.info("RunMode of {} command is parallel run, workflowDefinitionCode:{}.",
                        command.getCommandType().getDescp(), command.getWorkflowDefinitionCode());

                int queueNum = 0;
                if (CollectionUtils.isNotEmpty(listDate)) {
                    queueNum = listDate.size();
                    if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
                        queueNum = Math.min(queueNum, expectedParallelismNumber);
                    }
                    log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
                            queueNum);
                    List[] queues = new List[queueNum];

                    for (int i = 0; i < listDate.size(); i++) {
                        if (Objects.isNull(queues[i % queueNum])) {
                            queues[i % queueNum] = new ArrayList();
                        }
                        queues[i % queueNum].add(listDate.get(i));
                    }
                    for (List queue : queues) {
                        createCount = createComplementCommand(triggerCode, command, cmdParam, queue, schedules,
                                complementDependentMode, allLevelDependent);
                    }
                }
                break;
            }
            default:
                break;
        }
        log.info("Create complement command count:{}, Create dependent complement command count:{}", createCount,
                dependentWorkflowDefinitionCreateCount);
        return createCount;
    }