in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java [466:560]
protected int createComplementCommandList(Long triggerCode, String scheduleTimeParam, RunMode runMode,
Command command,
Integer expectedParallelismNumber,
ComplementDependentMode complementDependentMode,
boolean allLevelDependent,
ExecutionOrder executionOrder) throws CronParseException {
int createCount = 0;
int dependentWorkflowDefinitionCreateCount = 0;
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
if (Objects.isNull(executionOrder)) {
executionOrder = ExecutionOrder.DESC_ORDER;
}
List<Schedule> schedules = processService.queryReleaseSchedulerListByWorkflowDefinitionCode(
command.getWorkflowDefinitionCode());
List<ZonedDateTime> listDate = new ArrayList<>();
if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey(
CMD_PARAM_COMPLEMENT_DATA_END_DATE)) {
String startDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE);
String endDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE);
if (startDate != null && endDate != null) {
listDate = CronUtils.getSelfFireDateList(
DateUtils.stringToZoneDateTime(startDate),
DateUtils.stringToZoneDateTime(endDate),
schedules);
}
}
if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
String dateList = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
if (StringUtils.isNotBlank(dateList)) {
listDate = Splitter.on(COMMA).splitToStream(dateList)
.map(item -> DateUtils.stringToZoneDateTime(item.trim()))
.distinct()
.collect(Collectors.toList());
}
}
if (CollectionUtils.isEmpty(listDate)) {
throw new ServiceException(Status.TASK_COMPLEMENT_DATA_DATE_ERROR);
}
if (executionOrder.equals(ExecutionOrder.DESC_ORDER)) {
Collections.sort(listDate, Collections.reverseOrder());
} else {
Collections.sort(listDate);
}
switch (runMode) {
case RUN_MODE_SERIAL: {
log.info("RunMode of {} command is serial run, workflowDefinitionCode:{}.",
command.getCommandType().getDescp(), command.getWorkflowDefinitionCode());
createCount = createComplementCommand(triggerCode, command, cmdParam, listDate, schedules,
complementDependentMode, allLevelDependent);
break;
}
case RUN_MODE_PARALLEL: {
log.info("RunMode of {} command is parallel run, workflowDefinitionCode:{}.",
command.getCommandType().getDescp(), command.getWorkflowDefinitionCode());
int queueNum = 0;
if (CollectionUtils.isNotEmpty(listDate)) {
queueNum = listDate.size();
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
queueNum = Math.min(queueNum, expectedParallelismNumber);
}
log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
queueNum);
List[] queues = new List[queueNum];
for (int i = 0; i < listDate.size(); i++) {
if (Objects.isNull(queues[i % queueNum])) {
queues[i % queueNum] = new ArrayList();
}
queues[i % queueNum].add(listDate.get(i));
}
for (List queue : queues) {
createCount = createComplementCommand(triggerCode, command, cmdParam, queue, schedules,
complementDependentMode, allLevelDependent);
}
}
break;
}
default:
break;
}
log.info("Create complement command count:{}, Create dependent complement command count:{}", createCount,
dependentWorkflowDefinitionCreateCount);
return createCount;
}