in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowTaskRelationServiceImpl.java [114:175]
public Map<String, Object> createWorkflowTaskRelation(User loginUser, long projectCode, long workflowDefinitionCode,
long preTaskCode, long postTaskCode) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
WorkflowDefinition workflowDefinition = workflowDefinitionMapper.queryByCode(workflowDefinitionCode);
if (workflowDefinition == null) {
log.error("workflow definition does not exist, workflowDefinitionCode:{}.", workflowDefinitionCode);
putMsg(result, Status.WORKFLOW_DEFINITION_NOT_EXIST, String.valueOf(workflowDefinitionCode));
return result;
}
if (workflowDefinition.getProjectCode() != projectCode) {
log.error("workflow definition's project does not match project {}.", projectCode);
putMsg(result, Status.PROJECT_WORKFLOW_NOT_MATCH);
return result;
}
updateWorkflowDefiniteVersion(loginUser, result, workflowDefinition);
List<WorkflowTaskRelation> workflowTaskRelationList =
workflowTaskRelationMapper.queryByWorkflowDefinitionCode(workflowDefinitionCode);
List<WorkflowTaskRelation> workflowTaskRelations = Lists.newArrayList(workflowTaskRelationList);
if (!workflowTaskRelations.isEmpty()) {
Map<Long, WorkflowTaskRelation> preTaskCodeMap =
workflowTaskRelations.stream().filter(r -> r.getPostTaskCode() == postTaskCode)
.collect(Collectors.toMap(WorkflowTaskRelation::getPreTaskCode,
workflowTaskRelation -> workflowTaskRelation));
if (!preTaskCodeMap.isEmpty()) {
if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
putMsg(result, Status.WORKFLOW_TASK_RELATION_EXIST, String.valueOf(workflowDefinitionCode));
return result;
}
if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
// delete no upstream
workflowTaskRelations.remove(preTaskCodeMap.get(0L));
}
}
}
TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode);
WorkflowTaskRelation workflowTaskRelation = setRelation(workflowDefinition, postTaskDefinition);
if (preTaskCode != 0L) {
TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(preTaskCode);
List<WorkflowTaskRelation> upstreamTaskRelationList = workflowTaskRelations.stream()
.filter(r -> r.getPostTaskCode() == preTaskCode).collect(Collectors.toList());
// upstream is or not exist
if (upstreamTaskRelationList.isEmpty()) {
WorkflowTaskRelation preWorkflowTaskRelation = setRelation(workflowDefinition, preTaskDefinition);
preWorkflowTaskRelation.setPreTaskCode(0L);
preWorkflowTaskRelation.setPreTaskVersion(0);
workflowTaskRelations.add(preWorkflowTaskRelation);
}
workflowTaskRelation.setPreTaskCode(preTaskDefinition.getCode());
workflowTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion());
} else {
workflowTaskRelation.setPreTaskCode(0L);
workflowTaskRelation.setPreTaskVersion(0);
}
workflowTaskRelations.add(workflowTaskRelation);
updateRelation(loginUser, result, workflowDefinition, workflowTaskRelations);
return result;
}