in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java [641:760]
public Map<String, Object> updateWorkflowInstance(User loginUser, long projectCode, Integer workflowInstanceId,
String taskRelationJson,
String taskDefinitionJson, String scheduleTime,
Boolean syncDefine,
String globalParams,
String locations, int timeout) {
// check user access for project
projectService.checkProjectAndAuthThrowException(loginUser, projectCode,
ApiFuncIdentificationConstant.INSTANCE_UPDATE);
Map<String, Object> result = new HashMap<>();
// check workflow instance exists
WorkflowInstance workflowInstance = processService.findWorkflowInstanceDetailById(workflowInstanceId)
.orElseThrow(() -> new ServiceException(WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId));
// check workflow instance exists in project
WorkflowDefinition workflowDefinition0 =
workflowDefinitionMapper.queryByCode(workflowInstance.getWorkflowDefinitionCode());
if (workflowDefinition0 != null && projectCode != workflowDefinition0.getProjectCode()) {
log.error("workflow definition does not exist, projectCode:{}, workflowDefinitionCode:{}.", projectCode,
workflowInstance.getWorkflowDefinitionCode());
putMsg(result, WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
return result;
}
// check workflow instance status
if (!workflowInstance.getState().isFinished()) {
log.warn("workflow Instance state is {} so can not update workflow instance, workflowInstanceId:{}.",
workflowInstance.getState().getDesc(), workflowInstanceId);
putMsg(result, WORKFLOW_INSTANCE_STATE_OPERATION_ERROR,
workflowInstance.getName(), workflowInstance.getState().toString(), "update");
return result;
}
//
Map<String, String> commandParamMap = JSONUtils.toMap(workflowInstance.getCommandParam());
String timezoneId = null;
if (commandParamMap == null || StringUtils.isBlank(commandParamMap.get(Constants.SCHEDULE_TIMEZONE))) {
timezoneId = loginUser.getTimeZone();
} else {
timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE);
}
setWorkflowInstance(workflowInstance, scheduleTime, globalParams, timeout, timezoneId);
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
if (taskDefinitionLogs.isEmpty()) {
log.warn("Parameter taskDefinitionJson is empty");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return result;
}
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (!checkTaskParameters(taskDefinitionLog.getTaskType(), taskDefinitionLog.getTaskParams())) {
log.error("Task parameters are invalid, taskDefinitionName:{}.", taskDefinitionLog.getName());
putMsg(result, Status.WORKFLOW_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return result;
}
}
int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, syncDefine);
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
log.error("Update task definition error, projectCode:{}, workflowInstanceId:{}", projectCode,
workflowInstanceId);
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
WorkflowDefinition workflowDefinition =
workflowDefinitionMapper.queryByCode(workflowInstance.getWorkflowDefinitionCode());
List<WorkflowTaskRelationLog> taskRelationList =
JSONUtils.toList(taskRelationJson, WorkflowTaskRelationLog.class);
// check workflow json is valid
result = workflowDefinitionService.checkWorkflowNodeList(taskRelationJson, taskDefinitionLogs);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
workflowDefinition.set(projectCode, workflowDefinition.getName(), workflowDefinition.getDescription(),
globalParams, locations, timeout);
workflowDefinition.setUpdateTime(new Date());
int insertVersion = processService.saveWorkflowDefine(loginUser, workflowDefinition, syncDefine, Boolean.FALSE);
if (insertVersion == 0) {
log.error("Update workflow definition error, projectCode:{}, workflowDefinitionName:{}.", projectCode,
workflowDefinition.getName());
putMsg(result, Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
} else
log.info("Update workflow definition complete, projectCode:{}, workflowDefinitionName:{}.", projectCode,
workflowDefinition.getName());
// save workflow lineage
if (syncDefine) {
workflowDefinitionService.saveWorkflowLineage(projectCode, workflowDefinition.getCode(),
insertVersion, taskDefinitionLogs);
}
int insertResult = processService.saveTaskRelation(loginUser, workflowDefinition.getProjectCode(),
workflowDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, syncDefine);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
log.info(
"Update task relations complete, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
projectCode, workflowDefinition.getCode(), insertVersion);
putMsg(result, Status.SUCCESS);
result.put(DATA_LIST, workflowDefinition);
} else {
log.info(
"Update task relations error, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
projectCode, workflowDefinition.getCode(), insertVersion);
putMsg(result, Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
}
workflowInstance.setWorkflowDefinitionVersion(insertVersion);
boolean update = workflowInstanceDao.updateById(workflowInstance);
if (!update) {
log.error(
"Update workflow instance version error, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}",
projectCode, workflowDefinition.getCode(), insertVersion);
putMsg(result, Status.UPDATE_WORKFLOW_INSTANCE_ERROR);
throw new ServiceException(Status.UPDATE_WORKFLOW_INSTANCE_ERROR);
}
log.info(
"Update workflow instance complete, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}, workflowInstanceId:{}",
projectCode, workflowDefinition.getCode(), insertVersion, workflowInstanceId);
putMsg(result, Status.SUCCESS);
return result;
}