in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowTaskRelationServiceImpl.java [395:492]
public List<WorkflowTaskRelation> updateUpstreamTaskDefinitionWithSyncDag(User loginUser,
long taskCode,
Boolean needSyncDag,
TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
TaskDefinition downstreamTask = taskDefinitionMapper.queryByCode(taskCode);
if (downstreamTask == null) {
throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode);
}
List<Long> upstreamTaskCodes = taskRelationUpdateUpstreamRequest.getUpstreams();
WorkflowTaskRelation workflowTaskRelation = new WorkflowTaskRelation();
workflowTaskRelation.setPostTaskCode(taskCode);
Page<WorkflowTaskRelation> page = new Page<>(taskRelationUpdateUpstreamRequest.getPageNo(),
taskRelationUpdateUpstreamRequest.getPageSize());
IPage<WorkflowTaskRelation> workflowTaskRelationExistsIPage =
workflowTaskRelationMapper.filterWorkflowTaskRelation(page, workflowTaskRelation);
List<WorkflowTaskRelation> workflowTaskRelationExists = workflowTaskRelationExistsIPage.getRecords();
WorkflowDefinition workflowDefinition = null;
if (CollectionUtils.isNotEmpty(workflowTaskRelationExists)) {
workflowDefinition =
workflowDefinitionMapper.queryByCode(workflowTaskRelationExists.get(0).getWorkflowDefinitionCode());
} else if (taskRelationUpdateUpstreamRequest.getWorkflowCode() != 0L) {
workflowDefinition =
workflowDefinitionMapper.queryByCode(taskRelationUpdateUpstreamRequest.getWorkflowCode());
}
if (workflowDefinition == null) {
throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR,
taskRelationUpdateUpstreamRequest.toString());
}
workflowDefinition.setUpdateTime(new Date());
int insertVersion = workflowDefinition.getVersion();
if (needSyncDag) {
insertVersion =
this.saveWorkflowDefinition(loginUser, workflowDefinition);
if (insertVersion <= 0) {
throw new ServiceException(Status.UPDATE_WORKFLOW_DEFINITION_ERROR);
}
}
// get new relation to create and out of date relation to delete
List<Long> taskCodeCreates = upstreamTaskCodes
.stream()
.filter(upstreamTaskCode -> workflowTaskRelationExists.stream().noneMatch(
workflowTaskRelation1 -> workflowTaskRelation1.getPreTaskCode() == upstreamTaskCode))
.collect(Collectors.toList());
List<Integer> taskCodeDeletes = workflowTaskRelationExists.stream()
.filter(ptr -> !upstreamTaskCodes.contains(ptr.getPreTaskCode()))
.map(WorkflowTaskRelation::getId)
.collect(Collectors.toList());
// delete relation not exists
if (CollectionUtils.isNotEmpty(taskCodeDeletes)) {
int delete = workflowTaskRelationMapper.deleteBatchIds(taskCodeDeletes);
if (delete != taskCodeDeletes.size()) {
throw new ServiceException(Status.WORKFLOW_TASK_RELATION_BATCH_DELETE_ERROR, taskCodeDeletes);
}
}
// create relation not exists
List<WorkflowTaskRelation> workflowTaskRelations = new ArrayList<>();
for (long createCode : taskCodeCreates) {
long upstreamCode = 0L;
int version = 0;
if (createCode != 0L) {
// 0 for DAG root, should not, it may already exists and skip to create anymore
TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode);
if (upstreamTask == null) {
throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, createCode);
}
upstreamCode = upstreamTask.getCode();
version = upstreamTask.getVersion();
}
WorkflowTaskRelation workflowTaskRelationCreate =
new WorkflowTaskRelation(null, workflowDefinition.getVersion(), downstreamTask.getProjectCode(),
workflowDefinition.getCode(), upstreamCode, version,
downstreamTask.getCode(), downstreamTask.getVersion(), null, null);
workflowTaskRelations.add(workflowTaskRelationCreate);
}
int batchInsert = workflowTaskRelationMapper.batchInsert(workflowTaskRelations);
if (batchInsert != workflowTaskRelations.size()) {
throw new ServiceException(Status.WORKFLOW_TASK_RELATION_BATCH_CREATE_ERROR, taskCodeCreates);
}
// batch sync to workflow task relation log
int saveTaskRelationResult = saveTaskRelation(loginUser, workflowDefinition, insertVersion);
if (saveTaskRelationResult != Constants.EXIT_CODE_SUCCESS) {
log.error(
"Save workflow task relations error, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
workflowDefinition.getProjectCode(), workflowDefinition.getCode(), insertVersion);
throw new ServiceException(Status.CREATE_WORKFLOW_TASK_RELATION_ERROR);
}
log.info(
"Save workflow task relations complete, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
workflowDefinition.getProjectCode(), workflowDefinition.getCode(), insertVersion);
workflowTaskRelations.get(0).setWorkflowDefinitionVersion(insertVersion);
return workflowTaskRelations;
}