in dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java [1036:1118]
public int saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs,
Boolean syncDefine) {
Date now = new Date();
List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
taskDefinitionLog.setProjectCode(projectCode);
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setOperator(operator.getId());
if (taskDefinitionLog.getCode() == 0) {
taskDefinitionLog.setCode(CodeGenerateUtils.genCode());
}
if (taskDefinitionLog.getVersion() == 0) {
// init first version
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
}
TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
if (definitionCodeAndVersion == null) {
taskDefinitionLog.setUserId(operator.getId());
taskDefinitionLog.setCreateTime(now);
newTaskDefinitionLogs.add(taskDefinitionLog);
continue;
}
if (taskDefinitionLog.equals(definitionCodeAndVersion)) {
// do nothing if equals
continue;
}
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
taskDefinitionLog.setVersion(version + 1);
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
updateTaskDefinitionLogs.add(taskDefinitionLog);
}
if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) {
List<Long> taskDefinitionCodes = updateTaskDefinitionLogs
.stream()
.map(TaskDefinition::getCode)
.distinct()
.collect(Collectors.toList());
Map<Long, TaskDefinition> taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes)
.stream()
.collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
TaskDefinition task = taskDefinitionMap.get(taskDefinitionToUpdate.getCode());
if (task == null) {
newTaskDefinitionLogs.add(taskDefinitionToUpdate);
} else {
taskDefinitionToUpdate.setId(task.getId());
}
}
}
// for each taskDefinitionLog, we will insert a new version into db
// and update the origin one if exist
int updateResult = 0;
int insertResult = 0;
// only insert new task definitions if they not in updateTaskDefinitionLogs
List<TaskDefinitionLog> newInsertTaskDefinitionLogs = newTaskDefinitionLogs.stream()
.filter(taskDefinitionLog -> !updateTaskDefinitionLogs.contains(taskDefinitionLog))
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(newInsertTaskDefinitionLogs)) {
insertResult = taskDefinitionLogMapper.batchInsert(newInsertTaskDefinitionLogs);
}
if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) {
insertResult += taskDefinitionLogMapper.batchInsert(updateTaskDefinitionLogs);
}
if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) {
updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
}
if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) {
for (TaskDefinitionLog taskDefinitionLog : updateTaskDefinitionLogs) {
updateResult += taskDefinitionMapper.updateById(taskDefinitionLog);
}
}
return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS;
}