public Map updateWorkflowInstance()

in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java [641:760]


    public Map<String, Object> updateWorkflowInstance(User loginUser, long projectCode, Integer workflowInstanceId,
                                                      String taskRelationJson,
                                                      String taskDefinitionJson, String scheduleTime,
                                                      Boolean syncDefine,
                                                      String globalParams,
                                                      String locations, int timeout) {
        // check user access for project
        projectService.checkProjectAndAuthThrowException(loginUser, projectCode,
                ApiFuncIdentificationConstant.INSTANCE_UPDATE);
        Map<String, Object> result = new HashMap<>();
        // check workflow instance exists
        WorkflowInstance workflowInstance = processService.findWorkflowInstanceDetailById(workflowInstanceId)
                .orElseThrow(() -> new ServiceException(WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId));
        // check workflow instance exists in project
        WorkflowDefinition workflowDefinition0 =
                workflowDefinitionMapper.queryByCode(workflowInstance.getWorkflowDefinitionCode());
        if (workflowDefinition0 != null && projectCode != workflowDefinition0.getProjectCode()) {
            log.error("workflow definition does not exist, projectCode:{}, workflowDefinitionCode:{}.", projectCode,
                    workflowInstance.getWorkflowDefinitionCode());
            putMsg(result, WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
            return result;
        }
        // check workflow instance status
        if (!workflowInstance.getState().isFinished()) {
            log.warn("workflow Instance state is {} so can not update workflow instance, workflowInstanceId:{}.",
                    workflowInstance.getState().getDesc(), workflowInstanceId);
            putMsg(result, WORKFLOW_INSTANCE_STATE_OPERATION_ERROR,
                    workflowInstance.getName(), workflowInstance.getState().toString(), "update");
            return result;
        }

        //
        Map<String, String> commandParamMap = JSONUtils.toMap(workflowInstance.getCommandParam());
        String timezoneId = null;
        if (commandParamMap == null || StringUtils.isBlank(commandParamMap.get(Constants.SCHEDULE_TIMEZONE))) {
            timezoneId = loginUser.getTimeZone();
        } else {
            timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE);
        }

        setWorkflowInstance(workflowInstance, scheduleTime, globalParams, timeout, timezoneId);
        List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
        if (taskDefinitionLogs.isEmpty()) {
            log.warn("Parameter taskDefinitionJson is empty");
            putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
            return result;
        }
        for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
            if (!checkTaskParameters(taskDefinitionLog.getTaskType(), taskDefinitionLog.getTaskParams())) {
                log.error("Task parameters are invalid,  taskDefinitionName:{}.", taskDefinitionLog.getName());
                putMsg(result, Status.WORKFLOW_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
                return result;
            }
        }
        int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, syncDefine);
        if (saveTaskResult == Constants.DEFINITION_FAILURE) {
            log.error("Update task definition error, projectCode:{}, workflowInstanceId:{}", projectCode,
                    workflowInstanceId);
            putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
            throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
        }
        WorkflowDefinition workflowDefinition =
                workflowDefinitionMapper.queryByCode(workflowInstance.getWorkflowDefinitionCode());
        List<WorkflowTaskRelationLog> taskRelationList =
                JSONUtils.toList(taskRelationJson, WorkflowTaskRelationLog.class);
        // check workflow json is valid
        result = workflowDefinitionService.checkWorkflowNodeList(taskRelationJson, taskDefinitionLogs);
        if (result.get(Constants.STATUS) != Status.SUCCESS) {
            return result;
        }

        workflowDefinition.set(projectCode, workflowDefinition.getName(), workflowDefinition.getDescription(),
                globalParams, locations, timeout);
        workflowDefinition.setUpdateTime(new Date());
        int insertVersion = processService.saveWorkflowDefine(loginUser, workflowDefinition, syncDefine, Boolean.FALSE);
        if (insertVersion == 0) {
            log.error("Update workflow definition error, projectCode:{}, workflowDefinitionName:{}.", projectCode,
                    workflowDefinition.getName());
            putMsg(result, Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
            throw new ServiceException(Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
        } else
            log.info("Update workflow definition complete, projectCode:{}, workflowDefinitionName:{}.", projectCode,
                    workflowDefinition.getName());

        // save workflow lineage
        if (syncDefine) {
            workflowDefinitionService.saveWorkflowLineage(projectCode, workflowDefinition.getCode(),
                    insertVersion, taskDefinitionLogs);
        }

        int insertResult = processService.saveTaskRelation(loginUser, workflowDefinition.getProjectCode(),
                workflowDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, syncDefine);
        if (insertResult == Constants.EXIT_CODE_SUCCESS) {
            log.info(
                    "Update task relations complete, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
                    projectCode, workflowDefinition.getCode(), insertVersion);
            putMsg(result, Status.SUCCESS);
            result.put(DATA_LIST, workflowDefinition);
        } else {
            log.info(
                    "Update task relations error, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
                    projectCode, workflowDefinition.getCode(), insertVersion);
            putMsg(result, Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
            throw new ServiceException(Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
        }
        workflowInstance.setWorkflowDefinitionVersion(insertVersion);
        boolean update = workflowInstanceDao.updateById(workflowInstance);
        if (!update) {
            log.error(
                    "Update workflow instance version error, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}",
                    projectCode, workflowDefinition.getCode(), insertVersion);
            putMsg(result, Status.UPDATE_WORKFLOW_INSTANCE_ERROR);
            throw new ServiceException(Status.UPDATE_WORKFLOW_INSTANCE_ERROR);
        }
        log.info(
                "Update workflow instance complete, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}, workflowInstanceId:{}",
                projectCode, workflowDefinition.getCode(), insertVersion, workflowInstanceId);
        putMsg(result, Status.SUCCESS);
        return result;
    }