in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java [345:479]
private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj,
Map<String, Object> result) {
Project project = projectMapper.queryByCode(projectCode);
// check if user have write perm for project
boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result);
if (!hasProjectAndWritePerm) {
return null;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
log.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
return null;
}
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
// if stream, can update task definition without online check
if (taskDefinition.getTaskExecuteType() != TaskExecuteType.STREAM) {
log.warn("Only {} type task can be updated without online check, taskDefinitionCode:{}.",
TaskExecuteType.STREAM, taskCode);
putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
return null;
}
}
TaskDefinitionLog taskDefinitionToUpdate =
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (TimeoutFlag.CLOSE == taskDefinition.getTimeoutFlag()) {
taskDefinition.setTimeoutNotifyStrategy(null);
}
if (taskDefinition.equals(taskDefinitionToUpdate)) {
log.warn("Task definition does not need update because no change, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.TASK_DEFINITION_NOT_MODIFY_ERROR, String.valueOf(taskCode));
return null;
}
if (taskDefinitionToUpdate == null) {
log.warn("Parameter taskDefinitionJson is invalid.");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
return null;
}
if (!checkTaskParameters(taskDefinitionToUpdate.getTaskType(), taskDefinitionToUpdate.getTaskParams())) {
putMsg(result, Status.WORKFLOW_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName());
return null;
}
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
if (version == null || version == 0) {
log.error("Max version task definitionLog can not be found in database, taskDefinitionCode:{}.",
taskCode);
putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
return null;
}
Date now = new Date();
taskDefinitionToUpdate.setCode(taskCode);
taskDefinitionToUpdate.setId(taskDefinition.getId());
taskDefinitionToUpdate.setProjectCode(projectCode);
taskDefinitionToUpdate.setUserId(taskDefinition.getUserId());
taskDefinitionToUpdate.setVersion(++version);
taskDefinitionToUpdate.setTaskType(taskDefinitionToUpdate.getTaskType().toUpperCase());
taskDefinitionToUpdate.setResourceIds(processService.getResourceIds(taskDefinitionToUpdate));
taskDefinitionToUpdate.setUpdateTime(now);
int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate);
taskDefinitionToUpdate.setOperator(loginUser.getId());
taskDefinitionToUpdate.setOperateTime(now);
taskDefinitionToUpdate.setCreateTime(now);
taskDefinitionToUpdate.setId(null);
int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
if ((update & insert) != 1) {
log.error("Update task definition or definitionLog error, projectCode:{}, taskDefinitionCode:{}.",
projectCode, taskCode);
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
} else
log.info(
"Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
projectCode, taskCode, taskDefinitionToUpdate.getVersion());
// update workflow task relation
List<WorkflowTaskRelation> workflowTaskRelations = workflowTaskRelationMapper
.queryWorkflowTaskRelationByTaskCodeAndTaskVersion(taskDefinitionToUpdate.getCode(),
taskDefinition.getVersion());
if (CollectionUtils.isNotEmpty(workflowTaskRelations)) {
Map<Long, List<WorkflowTaskRelation>> workflowTaskRelationGroupList = workflowTaskRelations.stream()
.collect(Collectors.groupingBy(WorkflowTaskRelation::getWorkflowDefinitionCode));
for (Map.Entry<Long, List<WorkflowTaskRelation>> workflowTaskRelationMap : workflowTaskRelationGroupList
.entrySet()) {
Long workflowDefinitionCode = workflowTaskRelationMap.getKey();
int workflowDefinitionVersion =
workflowDefinitionLogMapper.queryMaxVersionForDefinition(workflowDefinitionCode)
+ 1;
List<WorkflowTaskRelation> workflowTaskRelationList = workflowTaskRelationMap.getValue();
for (WorkflowTaskRelation workflowTaskRelation : workflowTaskRelationList) {
if (taskCode == workflowTaskRelation.getPreTaskCode()) {
workflowTaskRelation.setPreTaskVersion(version);
} else if (taskCode == workflowTaskRelation.getPostTaskCode()) {
workflowTaskRelation.setPostTaskVersion(version);
}
workflowTaskRelation.setWorkflowDefinitionVersion(workflowDefinitionVersion);
int updateWorkflowDefinitionVersionCount =
workflowTaskRelationMapper.updateWorkflowTaskRelationTaskVersion(workflowTaskRelation);
if (updateWorkflowDefinitionVersionCount != 1) {
log.error("batch update workflow task relation error, projectCode:{}, taskDefinitionCode:{}.",
projectCode, taskCode);
putMsg(result, Status.WORKFLOW_TASK_RELATION_BATCH_UPDATE_ERROR);
throw new ServiceException(Status.WORKFLOW_TASK_RELATION_BATCH_UPDATE_ERROR);
}
WorkflowTaskRelationLog workflowTaskRelationLog = new WorkflowTaskRelationLog(workflowTaskRelation);
workflowTaskRelationLog.setOperator(loginUser.getId());
workflowTaskRelationLog.setId(null);
workflowTaskRelationLog.setOperateTime(now);
int insertWorkflowTaskRelationLogCount = workflowTaskRelationLogDao.insert(workflowTaskRelationLog);
if (insertWorkflowTaskRelationLogCount != 1) {
log.error("batch update workflow task relation error, projectCode:{}, taskDefinitionCode:{}.",
projectCode, taskCode);
putMsg(result, Status.CREATE_WORKFLOW_TASK_RELATION_LOG_ERROR);
throw new ServiceException(Status.CREATE_WORKFLOW_TASK_RELATION_LOG_ERROR);
}
}
WorkflowDefinition workflowDefinition = workflowDefinitionMapper.queryByCode(workflowDefinitionCode);
workflowDefinition.setVersion(workflowDefinitionVersion);
workflowDefinition.setUpdateTime(now);
workflowDefinition.setUserId(loginUser.getId());
// update workflow definition
int updateWorkflowDefinitionCount = workflowDefinitionMapper.updateById(workflowDefinition);
WorkflowDefinitionLog workflowDefinitionLog = new WorkflowDefinitionLog(workflowDefinition);
workflowDefinitionLog.setOperateTime(now);
workflowDefinitionLog.setId(null);
workflowDefinitionLog.setOperator(loginUser.getId());
int insertWorkflowDefinitionLogCount = workflowDefinitionLogMapper.insert(workflowDefinitionLog);
if ((updateWorkflowDefinitionCount & insertWorkflowDefinitionLogCount) != 1) {
putMsg(result, Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
}
}
}
return taskDefinitionToUpdate;
}