public List updateUpstreamTaskDefinitionWithSyncDag()

in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowTaskRelationServiceImpl.java [395:492]


    public List<WorkflowTaskRelation> updateUpstreamTaskDefinitionWithSyncDag(User loginUser,
                                                                              long taskCode,
                                                                              Boolean needSyncDag,
                                                                              TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
        TaskDefinition downstreamTask = taskDefinitionMapper.queryByCode(taskCode);
        if (downstreamTask == null) {
            throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode);
        }
        List<Long> upstreamTaskCodes = taskRelationUpdateUpstreamRequest.getUpstreams();

        WorkflowTaskRelation workflowTaskRelation = new WorkflowTaskRelation();
        workflowTaskRelation.setPostTaskCode(taskCode);

        Page<WorkflowTaskRelation> page = new Page<>(taskRelationUpdateUpstreamRequest.getPageNo(),
                taskRelationUpdateUpstreamRequest.getPageSize());
        IPage<WorkflowTaskRelation> workflowTaskRelationExistsIPage =
                workflowTaskRelationMapper.filterWorkflowTaskRelation(page, workflowTaskRelation);
        List<WorkflowTaskRelation> workflowTaskRelationExists = workflowTaskRelationExistsIPage.getRecords();

        WorkflowDefinition workflowDefinition = null;
        if (CollectionUtils.isNotEmpty(workflowTaskRelationExists)) {
            workflowDefinition =
                    workflowDefinitionMapper.queryByCode(workflowTaskRelationExists.get(0).getWorkflowDefinitionCode());
        } else if (taskRelationUpdateUpstreamRequest.getWorkflowCode() != 0L) {
            workflowDefinition =
                    workflowDefinitionMapper.queryByCode(taskRelationUpdateUpstreamRequest.getWorkflowCode());
        }
        if (workflowDefinition == null) {
            throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR,
                    taskRelationUpdateUpstreamRequest.toString());
        }
        workflowDefinition.setUpdateTime(new Date());
        int insertVersion = workflowDefinition.getVersion();
        if (needSyncDag) {
            insertVersion =
                    this.saveWorkflowDefinition(loginUser, workflowDefinition);
            if (insertVersion <= 0) {
                throw new ServiceException(Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
            }
        }
        // get new relation to create and out of date relation to delete
        List<Long> taskCodeCreates = upstreamTaskCodes
                .stream()
                .filter(upstreamTaskCode -> workflowTaskRelationExists.stream().noneMatch(
                        workflowTaskRelation1 -> workflowTaskRelation1.getPreTaskCode() == upstreamTaskCode))
                .collect(Collectors.toList());
        List<Integer> taskCodeDeletes = workflowTaskRelationExists.stream()
                .filter(ptr -> !upstreamTaskCodes.contains(ptr.getPreTaskCode()))
                .map(WorkflowTaskRelation::getId)
                .collect(Collectors.toList());

        // delete relation not exists
        if (CollectionUtils.isNotEmpty(taskCodeDeletes)) {
            int delete = workflowTaskRelationMapper.deleteBatchIds(taskCodeDeletes);
            if (delete != taskCodeDeletes.size()) {
                throw new ServiceException(Status.WORKFLOW_TASK_RELATION_BATCH_DELETE_ERROR, taskCodeDeletes);
            }
        }

        // create relation not exists
        List<WorkflowTaskRelation> workflowTaskRelations = new ArrayList<>();
        for (long createCode : taskCodeCreates) {
            long upstreamCode = 0L;
            int version = 0;
            if (createCode != 0L) {
                // 0 for DAG root, should not, it may already exists and skip to create anymore
                TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode);
                if (upstreamTask == null) {
                    throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, createCode);
                }
                upstreamCode = upstreamTask.getCode();
                version = upstreamTask.getVersion();
            }
            WorkflowTaskRelation workflowTaskRelationCreate =
                    new WorkflowTaskRelation(null, workflowDefinition.getVersion(), downstreamTask.getProjectCode(),
                            workflowDefinition.getCode(), upstreamCode, version,
                            downstreamTask.getCode(), downstreamTask.getVersion(), null, null);
            workflowTaskRelations.add(workflowTaskRelationCreate);
        }
        int batchInsert = workflowTaskRelationMapper.batchInsert(workflowTaskRelations);
        if (batchInsert != workflowTaskRelations.size()) {
            throw new ServiceException(Status.WORKFLOW_TASK_RELATION_BATCH_CREATE_ERROR, taskCodeCreates);
        }

        // batch sync to workflow task relation log
        int saveTaskRelationResult = saveTaskRelation(loginUser, workflowDefinition, insertVersion);
        if (saveTaskRelationResult != Constants.EXIT_CODE_SUCCESS) {
            log.error(
                    "Save workflow task relations error, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
                    workflowDefinition.getProjectCode(), workflowDefinition.getCode(), insertVersion);
            throw new ServiceException(Status.CREATE_WORKFLOW_TASK_RELATION_ERROR);
        }
        log.info(
                "Save workflow task relations complete, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
                workflowDefinition.getProjectCode(), workflowDefinition.getCode(), insertVersion);
        workflowTaskRelations.get(0).setWorkflowDefinitionVersion(insertVersion);
        return workflowTaskRelations;
    }