private TaskDefinitionLog updateTask()

in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java [345:479]


    private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj,
                                         Map<String, Object> result) {
        Project project = projectMapper.queryByCode(projectCode);

        // check if user have write perm for project
        boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result);
        if (!hasProjectAndWritePerm) {
            return null;
        }

        TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
        if (taskDefinition == null) {
            log.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode);
            putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
            return null;
        }
        if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
            // if stream, can update task definition without online check
            if (taskDefinition.getTaskExecuteType() != TaskExecuteType.STREAM) {
                log.warn("Only {} type task can be updated without online check, taskDefinitionCode:{}.",
                        TaskExecuteType.STREAM, taskCode);
                putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
                return null;
            }
        }
        TaskDefinitionLog taskDefinitionToUpdate =
                JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
        if (TimeoutFlag.CLOSE == taskDefinition.getTimeoutFlag()) {
            taskDefinition.setTimeoutNotifyStrategy(null);
        }
        if (taskDefinition.equals(taskDefinitionToUpdate)) {
            log.warn("Task definition does not need update because no change, taskDefinitionCode:{}.", taskCode);
            putMsg(result, Status.TASK_DEFINITION_NOT_MODIFY_ERROR, String.valueOf(taskCode));
            return null;
        }
        if (taskDefinitionToUpdate == null) {
            log.warn("Parameter taskDefinitionJson is invalid.");
            putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
            return null;
        }
        if (!checkTaskParameters(taskDefinitionToUpdate.getTaskType(), taskDefinitionToUpdate.getTaskParams())) {
            putMsg(result, Status.WORKFLOW_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName());
            return null;
        }
        Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
        if (version == null || version == 0) {
            log.error("Max version task definitionLog can not be found in database, taskDefinitionCode:{}.",
                    taskCode);
            putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
            return null;
        }
        Date now = new Date();
        taskDefinitionToUpdate.setCode(taskCode);
        taskDefinitionToUpdate.setId(taskDefinition.getId());
        taskDefinitionToUpdate.setProjectCode(projectCode);
        taskDefinitionToUpdate.setUserId(taskDefinition.getUserId());
        taskDefinitionToUpdate.setVersion(++version);
        taskDefinitionToUpdate.setTaskType(taskDefinitionToUpdate.getTaskType().toUpperCase());
        taskDefinitionToUpdate.setResourceIds(processService.getResourceIds(taskDefinitionToUpdate));
        taskDefinitionToUpdate.setUpdateTime(now);
        int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate);
        taskDefinitionToUpdate.setOperator(loginUser.getId());
        taskDefinitionToUpdate.setOperateTime(now);
        taskDefinitionToUpdate.setCreateTime(now);
        taskDefinitionToUpdate.setId(null);
        int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
        if ((update & insert) != 1) {
            log.error("Update task definition or definitionLog error, projectCode:{}, taskDefinitionCode:{}.",
                    projectCode, taskCode);
            putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
            throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
        } else
            log.info(
                    "Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
                    projectCode, taskCode, taskDefinitionToUpdate.getVersion());
        // update workflow task relation
        List<WorkflowTaskRelation> workflowTaskRelations = workflowTaskRelationMapper
                .queryWorkflowTaskRelationByTaskCodeAndTaskVersion(taskDefinitionToUpdate.getCode(),
                        taskDefinition.getVersion());
        if (CollectionUtils.isNotEmpty(workflowTaskRelations)) {
            Map<Long, List<WorkflowTaskRelation>> workflowTaskRelationGroupList = workflowTaskRelations.stream()
                    .collect(Collectors.groupingBy(WorkflowTaskRelation::getWorkflowDefinitionCode));
            for (Map.Entry<Long, List<WorkflowTaskRelation>> workflowTaskRelationMap : workflowTaskRelationGroupList
                    .entrySet()) {
                Long workflowDefinitionCode = workflowTaskRelationMap.getKey();
                int workflowDefinitionVersion =
                        workflowDefinitionLogMapper.queryMaxVersionForDefinition(workflowDefinitionCode)
                                + 1;
                List<WorkflowTaskRelation> workflowTaskRelationList = workflowTaskRelationMap.getValue();
                for (WorkflowTaskRelation workflowTaskRelation : workflowTaskRelationList) {
                    if (taskCode == workflowTaskRelation.getPreTaskCode()) {
                        workflowTaskRelation.setPreTaskVersion(version);
                    } else if (taskCode == workflowTaskRelation.getPostTaskCode()) {
                        workflowTaskRelation.setPostTaskVersion(version);
                    }
                    workflowTaskRelation.setWorkflowDefinitionVersion(workflowDefinitionVersion);
                    int updateWorkflowDefinitionVersionCount =
                            workflowTaskRelationMapper.updateWorkflowTaskRelationTaskVersion(workflowTaskRelation);
                    if (updateWorkflowDefinitionVersionCount != 1) {
                        log.error("batch update workflow task relation error, projectCode:{}, taskDefinitionCode:{}.",
                                projectCode, taskCode);
                        putMsg(result, Status.WORKFLOW_TASK_RELATION_BATCH_UPDATE_ERROR);
                        throw new ServiceException(Status.WORKFLOW_TASK_RELATION_BATCH_UPDATE_ERROR);
                    }
                    WorkflowTaskRelationLog workflowTaskRelationLog = new WorkflowTaskRelationLog(workflowTaskRelation);
                    workflowTaskRelationLog.setOperator(loginUser.getId());
                    workflowTaskRelationLog.setId(null);
                    workflowTaskRelationLog.setOperateTime(now);
                    int insertWorkflowTaskRelationLogCount = workflowTaskRelationLogDao.insert(workflowTaskRelationLog);
                    if (insertWorkflowTaskRelationLogCount != 1) {
                        log.error("batch update workflow task relation error, projectCode:{}, taskDefinitionCode:{}.",
                                projectCode, taskCode);
                        putMsg(result, Status.CREATE_WORKFLOW_TASK_RELATION_LOG_ERROR);
                        throw new ServiceException(Status.CREATE_WORKFLOW_TASK_RELATION_LOG_ERROR);
                    }
                }
                WorkflowDefinition workflowDefinition = workflowDefinitionMapper.queryByCode(workflowDefinitionCode);
                workflowDefinition.setVersion(workflowDefinitionVersion);
                workflowDefinition.setUpdateTime(now);
                workflowDefinition.setUserId(loginUser.getId());
                // update workflow definition
                int updateWorkflowDefinitionCount = workflowDefinitionMapper.updateById(workflowDefinition);
                WorkflowDefinitionLog workflowDefinitionLog = new WorkflowDefinitionLog(workflowDefinition);
                workflowDefinitionLog.setOperateTime(now);
                workflowDefinitionLog.setId(null);
                workflowDefinitionLog.setOperator(loginUser.getId());
                int insertWorkflowDefinitionLogCount = workflowDefinitionLogMapper.insert(workflowDefinitionLog);
                if ((updateWorkflowDefinitionCount & insertWorkflowDefinitionLogCount) != 1) {
                    putMsg(result, Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
                    throw new ServiceException(Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
                }
            }
        }
        return taskDefinitionToUpdate;
    }