in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java [492:566]
public Map<String, Object> updateTaskWithUpstream(User loginUser, long projectCode, long taskCode,
String taskDefinitionJsonObj, String upstreamCodes) {
Map<String, Object> result = new HashMap<>();
TaskDefinitionLog taskDefinitionToUpdate =
updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result);
List<WorkflowTaskRelation> upstreamTaskRelations =
workflowTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
Set<Long> upstreamCodeSet =
upstreamTaskRelations.stream().map(WorkflowTaskRelation::getPreTaskCode).collect(Collectors.toSet());
Set<Long> upstreamTaskCodes = Collections.emptySet();
if (StringUtils.isNotEmpty(upstreamCodes)) {
upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong)
.collect(Collectors.toSet());
}
if (CollectionUtils.isEqualCollection(upstreamCodeSet, upstreamTaskCodes) && taskDefinitionToUpdate == null) {
putMsg(result, Status.SUCCESS);
return result;
}
Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
List<TaskDefinition> upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream()
.collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition));
// upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet());
if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
String notExistTaskCodes = StringUtils.join(upstreamTaskCodes, Constants.COMMA);
log.error("Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}.",
notExistTaskCodes);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, notExistTaskCodes);
return result;
}
} else {
queryUpStreamTaskCodeMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(queryUpStreamTaskCodeMap)) {
WorkflowTaskRelation taskRelation = upstreamTaskRelations.get(0);
List<WorkflowTaskRelation> workflowTaskRelations =
workflowTaskRelationMapper.queryByWorkflowDefinitionCode(taskRelation.getWorkflowDefinitionCode());
// set upstream code list
updateUpstreamTask(new HashSet<>(queryUpStreamTaskCodeMap.keySet()),
taskCode, projectCode, taskRelation.getWorkflowDefinitionCode(), loginUser);
List<WorkflowTaskRelation> workflowTaskRelationList = Lists.newArrayList(workflowTaskRelations);
List<WorkflowTaskRelation> relationList = Lists.newArrayList();
for (WorkflowTaskRelation workflowTaskRelation : workflowTaskRelationList) {
if (workflowTaskRelation.getPostTaskCode() == taskCode) {
if (queryUpStreamTaskCodeMap.containsKey(workflowTaskRelation.getPreTaskCode())
&& workflowTaskRelation.getPreTaskCode() != 0L) {
queryUpStreamTaskCodeMap.remove(workflowTaskRelation.getPreTaskCode());
} else {
workflowTaskRelation.setPreTaskCode(0L);
workflowTaskRelation.setPreTaskVersion(0);
relationList.add(workflowTaskRelation);
}
}
}
workflowTaskRelationList.removeAll(relationList);
for (Map.Entry<Long, TaskDefinition> queryUpStreamTask : queryUpStreamTaskCodeMap.entrySet()) {
taskRelation.setPreTaskCode(queryUpStreamTask.getKey());
taskRelation.setPreTaskVersion(queryUpStreamTask.getValue().getVersion());
workflowTaskRelationList.add(taskRelation);
}
if (MapUtils.isEmpty(queryUpStreamTaskCodeMap) && CollectionUtils.isNotEmpty(workflowTaskRelationList)) {
workflowTaskRelationList.add(workflowTaskRelationList.get(0));
}
}
log.info(
"Update task with upstream tasks complete, projectCode:{}, taskDefinitionCode:{}, upstreamTaskCodes:{}.",
projectCode, taskCode, upstreamTaskCodes);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
return result;
}