public Map updateTaskWithUpstream()

in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java [492:566]


    public Map<String, Object> updateTaskWithUpstream(User loginUser, long projectCode, long taskCode,
                                                      String taskDefinitionJsonObj, String upstreamCodes) {
        Map<String, Object> result = new HashMap<>();
        TaskDefinitionLog taskDefinitionToUpdate =
                updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result);
        List<WorkflowTaskRelation> upstreamTaskRelations =
                workflowTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
        Set<Long> upstreamCodeSet =
                upstreamTaskRelations.stream().map(WorkflowTaskRelation::getPreTaskCode).collect(Collectors.toSet());
        Set<Long> upstreamTaskCodes = Collections.emptySet();
        if (StringUtils.isNotEmpty(upstreamCodes)) {
            upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong)
                    .collect(Collectors.toSet());
        }
        if (CollectionUtils.isEqualCollection(upstreamCodeSet, upstreamTaskCodes) && taskDefinitionToUpdate == null) {
            putMsg(result, Status.SUCCESS);
            return result;
        }
        Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
        if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
            List<TaskDefinition> upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
            queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream()
                    .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition));
            // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
            upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet());
            if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
                String notExistTaskCodes = StringUtils.join(upstreamTaskCodes, Constants.COMMA);
                log.error("Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}.",
                        notExistTaskCodes);
                putMsg(result, Status.TASK_DEFINE_NOT_EXIST, notExistTaskCodes);
                return result;
            }
        } else {
            queryUpStreamTaskCodeMap = new HashMap<>();
        }
        if (MapUtils.isNotEmpty(queryUpStreamTaskCodeMap)) {
            WorkflowTaskRelation taskRelation = upstreamTaskRelations.get(0);
            List<WorkflowTaskRelation> workflowTaskRelations =
                    workflowTaskRelationMapper.queryByWorkflowDefinitionCode(taskRelation.getWorkflowDefinitionCode());

            // set upstream code list
            updateUpstreamTask(new HashSet<>(queryUpStreamTaskCodeMap.keySet()),
                    taskCode, projectCode, taskRelation.getWorkflowDefinitionCode(), loginUser);

            List<WorkflowTaskRelation> workflowTaskRelationList = Lists.newArrayList(workflowTaskRelations);
            List<WorkflowTaskRelation> relationList = Lists.newArrayList();
            for (WorkflowTaskRelation workflowTaskRelation : workflowTaskRelationList) {
                if (workflowTaskRelation.getPostTaskCode() == taskCode) {
                    if (queryUpStreamTaskCodeMap.containsKey(workflowTaskRelation.getPreTaskCode())
                            && workflowTaskRelation.getPreTaskCode() != 0L) {
                        queryUpStreamTaskCodeMap.remove(workflowTaskRelation.getPreTaskCode());
                    } else {
                        workflowTaskRelation.setPreTaskCode(0L);
                        workflowTaskRelation.setPreTaskVersion(0);
                        relationList.add(workflowTaskRelation);
                    }
                }
            }
            workflowTaskRelationList.removeAll(relationList);
            for (Map.Entry<Long, TaskDefinition> queryUpStreamTask : queryUpStreamTaskCodeMap.entrySet()) {
                taskRelation.setPreTaskCode(queryUpStreamTask.getKey());
                taskRelation.setPreTaskVersion(queryUpStreamTask.getValue().getVersion());
                workflowTaskRelationList.add(taskRelation);
            }
            if (MapUtils.isEmpty(queryUpStreamTaskCodeMap) && CollectionUtils.isNotEmpty(workflowTaskRelationList)) {
                workflowTaskRelationList.add(workflowTaskRelationList.get(0));
            }
        }
        log.info(
                "Update task with upstream tasks complete, projectCode:{}, taskDefinitionCode:{}, upstreamTaskCodes:{}.",
                projectCode, taskCode, upstreamTaskCodes);
        result.put(Constants.DATA_LIST, taskCode);
        putMsg(result, Status.SUCCESS);
        return result;
    }