in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java [2095:2228]
protected void doBatchOperateWorkflowDefinition(User loginUser,
long targetProjectCode,
List<String> failedWorkflowList,
String workflowDefinitionCodes,
Map<String, Object> result,
boolean isCopy) {
Set<Long> definitionCodes = Arrays.stream(workflowDefinitionCodes.split(Constants.COMMA)).map(Long::parseLong)
.collect(Collectors.toSet());
List<WorkflowDefinition> workflowDefinitionList = workflowDefinitionMapper.queryByCodes(definitionCodes);
Set<Long> queryCodes =
workflowDefinitionList.stream().map(WorkflowDefinition::getCode).collect(Collectors.toSet());
// definitionCodes - queryCodes
Set<Long> diffCode =
definitionCodes.stream().filter(code -> !queryCodes.contains(code)).collect(Collectors.toSet());
diffCode.forEach(code -> failedWorkflowList.add(code + "[null]"));
for (WorkflowDefinition workflowDefinition : workflowDefinitionList) {
List<WorkflowTaskRelation> workflowTaskRelations =
workflowTaskRelationMapper.queryByWorkflowDefinitionCode(workflowDefinition.getCode());
List<WorkflowTaskRelationLog> taskRelationList =
workflowTaskRelations.stream().map(WorkflowTaskRelationLog::new).collect(Collectors.toList());
workflowDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
log.info("Copy workflow definition...");
List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitionLogDao.queryTaskDefineLogList(workflowTaskRelations);
Map<Long, Long> taskCodeMap = new HashMap<>();
taskDefinitionLogs.forEach(taskDefinitionLog -> {
try {
taskCodeMap.put(taskDefinitionLog.getCode(), CodeGenerateUtils.genCode());
} catch (CodeGenerateException e) {
log.error("Generate task definition code error, projectCode:{}.", targetProjectCode, e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}
});
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
taskDefinitionLog.setCode(taskCodeMap.get(taskDefinitionLog.getCode()));
taskDefinitionLog.setProjectCode(targetProjectCode);
taskDefinitionLog.setVersion(0);
taskDefinitionLog.setName(taskDefinitionLog.getName());
if (TaskTypeUtils.isSwitchTask(taskDefinitionLog.getTaskType())) {
final String taskParams = taskDefinitionLog.getTaskParams();
final SwitchParameters switchParameters =
JSONUtils.parseObject(taskParams, SwitchParameters.class);
if (switchParameters == null) {
throw new IllegalArgumentException(
"Switch task params: " + taskParams + " is invalid.");
}
SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult();
switchResult.getDependTaskList().forEach(switchResultVo -> {
switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode()));
});
if (switchResult.getNextNode() != null) {
switchResult.setNextNode(
taskCodeMap.get(switchResult.getNextNode()));
}
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters));
}
}
for (WorkflowTaskRelationLog workflowTaskRelationLog : taskRelationList) {
if (workflowTaskRelationLog.getPreTaskCode() > 0) {
workflowTaskRelationLog
.setPreTaskCode(taskCodeMap.get(workflowTaskRelationLog.getPreTaskCode()));
}
if (workflowTaskRelationLog.getPostTaskCode() > 0) {
workflowTaskRelationLog
.setPostTaskCode(taskCodeMap.get(workflowTaskRelationLog.getPostTaskCode()));
}
}
final long oldWorkflowDefinitionCode = workflowDefinition.getCode();
try {
workflowDefinition.setCode(CodeGenerateUtils.genCode());
} catch (CodeGenerateException e) {
log.error("Generate workflow definition code error, projectCode:{}.", targetProjectCode, e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}
workflowDefinition.setId(null);
workflowDefinition.setUserId(loginUser.getId());
workflowDefinition.setName(getNewName(workflowDefinition.getName(), COPY_SUFFIX));
final Date date = new Date();
workflowDefinition.setCreateTime(date);
workflowDefinition.setUpdateTime(date);
workflowDefinition.setReleaseState(ReleaseState.OFFLINE);
if (StringUtils.isNotBlank(workflowDefinition.getLocations())) {
ArrayNode jsonNodes = JSONUtils.parseArray(workflowDefinition.getLocations());
for (int i = 0; i < jsonNodes.size(); i++) {
ObjectNode node = (ObjectNode) jsonNodes.path(i);
node.put("taskCode", taskCodeMap.get(node.get("taskCode").asLong()));
jsonNodes.set(i, node);
}
workflowDefinition.setLocations(JSONUtils.toJsonString(jsonNodes));
}
// copy timing configuration
Schedule scheduleObj = scheduleMapper.queryByWorkflowDefinitionCode(oldWorkflowDefinitionCode);
if (scheduleObj != null) {
scheduleObj.setId(null);
scheduleObj.setUserId(loginUser.getId());
scheduleObj.setWorkflowDefinitionCode(workflowDefinition.getCode());
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
scheduleObj.setCreateTime(date);
scheduleObj.setUpdateTime(date);
int insertResult = scheduleMapper.insert(scheduleObj);
if (insertResult != 1) {
log.error("Schedule create error, workflowDefinitionCode:{}.", workflowDefinition.getCode());
putMsg(result, Status.CREATE_SCHEDULE_ERROR);
throw new ServiceException(Status.CREATE_SCHEDULE_ERROR);
}
}
try {
result.putAll(createDagDefine(loginUser, taskRelationList, workflowDefinition, taskDefinitionLogs));
} catch (Exception e) {
log.error("Copy workflow definition error, workflowDefinitionCode from {} to {}.",
oldWorkflowDefinitionCode, workflowDefinition.getCode(), e);
putMsg(result, Status.COPY_WORKFLOW_DEFINITION_ERROR);
throw new ServiceException(Status.COPY_WORKFLOW_DEFINITION_ERROR);
}
} else {
log.info("Move workflow definition...");
try {
result.putAll(updateDagDefine(loginUser, taskRelationList, workflowDefinition, null,
Lists.newArrayList()));
} catch (Exception e) {
log.error("Move workflow definition error, workflowDefinitionCode:{}.",
workflowDefinition.getCode(), e);
putMsg(result, Status.MOVE_WORKFLOW_DEFINITION_ERROR);
throw new ServiceException(Status.MOVE_WORKFLOW_DEFINITION_ERROR);
}
}
if (result.get(Constants.STATUS) != Status.SUCCESS) {
failedWorkflowList.add(workflowDefinition.getCode() + "[" + workflowDefinition.getName() + "]");
}
}
}