in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowTaskRelationServiceImpl.java [607:681]
public Map<String, Object> deleteUpstreamRelation(User loginUser, long projectCode, String preTaskCodes,
long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
if (StringUtils.isEmpty(preTaskCodes)) {
log.warn("Parameter preTaskCodes is empty.");
putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
return result;
}
List<WorkflowTaskRelation> upstreamList = workflowTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (CollectionUtils.isEmpty(upstreamList)) {
log.error("Upstream tasks based on the task do not exist, theTaskDefinitionCode:{}.", taskCode);
putMsg(result, Status.DATA_IS_NULL, "taskCode");
return result;
}
List<Long> preTaskCodeList = Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream()
.map(Long::parseLong).collect(Collectors.toList());
if (preTaskCodeList.contains(0L)) {
log.warn("Parameter preTaskCodes contain 0.");
putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
return result;
}
List<Long> currentUpstreamList =
upstreamList.stream().map(WorkflowTaskRelation::getPreTaskCode).collect(Collectors.toList());
if (currentUpstreamList.contains(0L)) {
log.error("Upstream taskCodes based on the task contain, theTaskDefinitionCode:{}.", taskCode);
putMsg(result, Status.DATA_IS_NOT_VALID, "currentUpstreamList");
return result;
}
List<Long> tmpCurrent = Lists.newArrayList(currentUpstreamList);
tmpCurrent.removeAll(preTaskCodeList);
preTaskCodeList.removeAll(currentUpstreamList);
if (!preTaskCodeList.isEmpty()) {
String invalidPreTaskCodes = StringUtils.join(preTaskCodeList, Constants.COMMA);
log.error("Some upstream taskCodes are invalid, preTaskCodeList:{}.", invalidPreTaskCodes);
putMsg(result, Status.DATA_IS_NOT_VALID, invalidPreTaskCodes);
return result;
}
WorkflowDefinition workflowDefinition =
workflowDefinitionMapper.queryByCode(upstreamList.get(0).getWorkflowDefinitionCode());
if (workflowDefinition == null) {
log.error("workflow definition does not exist, workflowDefinitionCode:{}.",
upstreamList.get(0).getWorkflowDefinitionCode());
putMsg(result, Status.WORKFLOW_DEFINITION_NOT_EXIST,
String.valueOf(upstreamList.get(0).getWorkflowDefinitionCode()));
return result;
}
List<WorkflowTaskRelation> workflowTaskRelations =
workflowTaskRelationMapper.queryByWorkflowDefinitionCode(workflowDefinition.getCode());
List<WorkflowTaskRelation> workflowTaskRelationList = Lists.newArrayList(workflowTaskRelations);
List<WorkflowTaskRelation> workflowTaskRelationWaitRemove = Lists.newArrayList();
for (WorkflowTaskRelation workflowTaskRelation : workflowTaskRelationList) {
if (currentUpstreamList.size() > 1) {
if (currentUpstreamList.contains(workflowTaskRelation.getPreTaskCode())) {
currentUpstreamList.remove(workflowTaskRelation.getPreTaskCode());
workflowTaskRelationWaitRemove.add(workflowTaskRelation);
}
} else {
if (workflowTaskRelation.getPostTaskCode() == taskCode
&& (currentUpstreamList.isEmpty() || tmpCurrent.isEmpty())) {
workflowTaskRelation.setPreTaskVersion(0);
workflowTaskRelation.setPreTaskCode(0L);
}
}
}
workflowTaskRelationList.removeAll(workflowTaskRelationWaitRemove);
updateWorkflowDefiniteVersion(loginUser, result, workflowDefinition);
updateRelation(loginUser, result, workflowDefinition, workflowTaskRelationList);
return result;
}