protected void doBatchOperateWorkflowDefinition()

in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java [2095:2228]


    protected void doBatchOperateWorkflowDefinition(User loginUser,
                                                    long targetProjectCode,
                                                    List<String> failedWorkflowList,
                                                    String workflowDefinitionCodes,
                                                    Map<String, Object> result,
                                                    boolean isCopy) {
        Set<Long> definitionCodes = Arrays.stream(workflowDefinitionCodes.split(Constants.COMMA)).map(Long::parseLong)
                .collect(Collectors.toSet());
        List<WorkflowDefinition> workflowDefinitionList = workflowDefinitionMapper.queryByCodes(definitionCodes);
        Set<Long> queryCodes =
                workflowDefinitionList.stream().map(WorkflowDefinition::getCode).collect(Collectors.toSet());
        // definitionCodes - queryCodes
        Set<Long> diffCode =
                definitionCodes.stream().filter(code -> !queryCodes.contains(code)).collect(Collectors.toSet());
        diffCode.forEach(code -> failedWorkflowList.add(code + "[null]"));
        for (WorkflowDefinition workflowDefinition : workflowDefinitionList) {
            List<WorkflowTaskRelation> workflowTaskRelations =
                    workflowTaskRelationMapper.queryByWorkflowDefinitionCode(workflowDefinition.getCode());
            List<WorkflowTaskRelationLog> taskRelationList =
                    workflowTaskRelations.stream().map(WorkflowTaskRelationLog::new).collect(Collectors.toList());
            workflowDefinition.setProjectCode(targetProjectCode);
            if (isCopy) {
                log.info("Copy workflow definition...");
                List<TaskDefinitionLog> taskDefinitionLogs =
                        taskDefinitionLogDao.queryTaskDefineLogList(workflowTaskRelations);
                Map<Long, Long> taskCodeMap = new HashMap<>();
                taskDefinitionLogs.forEach(taskDefinitionLog -> {
                    try {
                        taskCodeMap.put(taskDefinitionLog.getCode(), CodeGenerateUtils.genCode());
                    } catch (CodeGenerateException e) {
                        log.error("Generate task definition code error, projectCode:{}.", targetProjectCode, e);
                        putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
                        throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
                    }
                });
                for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
                    taskDefinitionLog.setCode(taskCodeMap.get(taskDefinitionLog.getCode()));
                    taskDefinitionLog.setProjectCode(targetProjectCode);
                    taskDefinitionLog.setVersion(0);
                    taskDefinitionLog.setName(taskDefinitionLog.getName());
                    if (TaskTypeUtils.isSwitchTask(taskDefinitionLog.getTaskType())) {
                        final String taskParams = taskDefinitionLog.getTaskParams();
                        final SwitchParameters switchParameters =
                                JSONUtils.parseObject(taskParams, SwitchParameters.class);
                        if (switchParameters == null) {
                            throw new IllegalArgumentException(
                                    "Switch task params: " + taskParams + " is invalid.");
                        }
                        SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult();
                        switchResult.getDependTaskList().forEach(switchResultVo -> {
                            switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode()));
                        });
                        if (switchResult.getNextNode() != null) {
                            switchResult.setNextNode(
                                    taskCodeMap.get(switchResult.getNextNode()));
                        }
                        taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters));
                    }
                }
                for (WorkflowTaskRelationLog workflowTaskRelationLog : taskRelationList) {
                    if (workflowTaskRelationLog.getPreTaskCode() > 0) {
                        workflowTaskRelationLog
                                .setPreTaskCode(taskCodeMap.get(workflowTaskRelationLog.getPreTaskCode()));
                    }
                    if (workflowTaskRelationLog.getPostTaskCode() > 0) {
                        workflowTaskRelationLog
                                .setPostTaskCode(taskCodeMap.get(workflowTaskRelationLog.getPostTaskCode()));
                    }
                }
                final long oldWorkflowDefinitionCode = workflowDefinition.getCode();
                try {
                    workflowDefinition.setCode(CodeGenerateUtils.genCode());
                } catch (CodeGenerateException e) {
                    log.error("Generate workflow definition code error, projectCode:{}.", targetProjectCode, e);
                    putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
                    throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
                }
                workflowDefinition.setId(null);
                workflowDefinition.setUserId(loginUser.getId());
                workflowDefinition.setName(getNewName(workflowDefinition.getName(), COPY_SUFFIX));
                final Date date = new Date();
                workflowDefinition.setCreateTime(date);
                workflowDefinition.setUpdateTime(date);
                workflowDefinition.setReleaseState(ReleaseState.OFFLINE);
                if (StringUtils.isNotBlank(workflowDefinition.getLocations())) {
                    ArrayNode jsonNodes = JSONUtils.parseArray(workflowDefinition.getLocations());
                    for (int i = 0; i < jsonNodes.size(); i++) {
                        ObjectNode node = (ObjectNode) jsonNodes.path(i);
                        node.put("taskCode", taskCodeMap.get(node.get("taskCode").asLong()));
                        jsonNodes.set(i, node);
                    }
                    workflowDefinition.setLocations(JSONUtils.toJsonString(jsonNodes));
                }
                // copy timing configuration
                Schedule scheduleObj = scheduleMapper.queryByWorkflowDefinitionCode(oldWorkflowDefinitionCode);
                if (scheduleObj != null) {
                    scheduleObj.setId(null);
                    scheduleObj.setUserId(loginUser.getId());
                    scheduleObj.setWorkflowDefinitionCode(workflowDefinition.getCode());
                    scheduleObj.setReleaseState(ReleaseState.OFFLINE);
                    scheduleObj.setCreateTime(date);
                    scheduleObj.setUpdateTime(date);
                    int insertResult = scheduleMapper.insert(scheduleObj);
                    if (insertResult != 1) {
                        log.error("Schedule create error, workflowDefinitionCode:{}.", workflowDefinition.getCode());
                        putMsg(result, Status.CREATE_SCHEDULE_ERROR);
                        throw new ServiceException(Status.CREATE_SCHEDULE_ERROR);
                    }
                }
                try {
                    result.putAll(createDagDefine(loginUser, taskRelationList, workflowDefinition, taskDefinitionLogs));
                } catch (Exception e) {
                    log.error("Copy workflow definition error, workflowDefinitionCode from {} to {}.",
                            oldWorkflowDefinitionCode, workflowDefinition.getCode(), e);
                    putMsg(result, Status.COPY_WORKFLOW_DEFINITION_ERROR);
                    throw new ServiceException(Status.COPY_WORKFLOW_DEFINITION_ERROR);
                }
            } else {
                log.info("Move workflow definition...");
                try {
                    result.putAll(updateDagDefine(loginUser, taskRelationList, workflowDefinition, null,
                            Lists.newArrayList()));
                } catch (Exception e) {
                    log.error("Move workflow definition error, workflowDefinitionCode:{}.",
                            workflowDefinition.getCode(), e);
                    putMsg(result, Status.MOVE_WORKFLOW_DEFINITION_ERROR);
                    throw new ServiceException(Status.MOVE_WORKFLOW_DEFINITION_ERROR);
                }
            }
            if (result.get(Constants.STATUS) != Status.SUCCESS) {
                failedWorkflowList.add(workflowDefinition.getCode() + "[" + workflowDefinition.getName() + "]");
            }
        }
    }