in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java [1458:1593]
protected boolean checkAndImport(User loginUser,
long projectCode,
Map<String, Object> result,
DagDataSchedule dagDataSchedule) {
if (!checkImportanceParams(dagDataSchedule, result)) {
return false;
}
WorkflowDefinition workflowDefinition = dagDataSchedule.getWorkflowDefinition();
// generate import workflowDefinitionName
String workflowDefinitionName = recursionWorkflowDefinitionName(projectCode, workflowDefinition.getName(), 1);
String importWorkflowDefinitionName = getNewName(workflowDefinitionName, IMPORT_SUFFIX);
// unique check
Map<String, Object> checkResult =
verifyWorkflowDefinitionName(loginUser, projectCode, importWorkflowDefinitionName, 0);
if (Status.SUCCESS.equals(checkResult.get(Constants.STATUS))) {
putMsg(result, Status.SUCCESS);
} else {
result.putAll(checkResult);
return false;
}
workflowDefinition.setName(importWorkflowDefinitionName);
workflowDefinition.setId(null);
workflowDefinition.setProjectCode(projectCode);
workflowDefinition.setUserId(loginUser.getId());
try {
workflowDefinition.setCode(CodeGenerateUtils.genCode());
} catch (CodeGenerateException e) {
log.error(
"Save workflow definition error because generate workflow definition code error, projectCode:{}.",
projectCode, e);
putMsg(result, Status.CREATE_WORKFLOW_DEFINITION_ERROR);
return false;
}
List<TaskDefinition> taskDefinitionList = dagDataSchedule.getTaskDefinitionList();
Map<Long, Long> taskCodeMap = new HashMap<>();
Date now = new Date();
List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
for (TaskDefinition taskDefinition : taskDefinitionList) {
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition);
taskDefinitionLog.setName(taskDefinitionLog.getName());
taskDefinitionLog.setProjectCode(projectCode);
taskDefinitionLog.setUserId(loginUser.getId());
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setCreateTime(now);
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLog.setOperator(loginUser.getId());
taskDefinitionLog.setOperateTime(now);
try {
long code = CodeGenerateUtils.genCode();
taskCodeMap.put(taskDefinitionLog.getCode(), code);
taskDefinitionLog.setCode(code);
} catch (CodeGenerateException e) {
log.error("Generate task definition code error, projectCode:{}, workflowDefinitionCode:{}",
projectCode, workflowDefinition.getCode(), e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return false;
}
taskDefinitionLogList.add(taskDefinitionLog);
}
int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList);
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList);
if ((logInsert & insert) == 0) {
log.error("Save task definition error, projectCode:{}, workflowDefinitionCode:{}", projectCode,
workflowDefinition.getCode());
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
List<WorkflowTaskRelation> taskRelationList = dagDataSchedule.getWorkflowTaskRelationList();
List<WorkflowTaskRelationLog> taskRelationLogList = new ArrayList<>();
for (WorkflowTaskRelation workflowTaskRelation : taskRelationList) {
WorkflowTaskRelationLog workflowTaskRelationLog = new WorkflowTaskRelationLog(workflowTaskRelation);
if (taskCodeMap.containsKey(workflowTaskRelationLog.getPreTaskCode())) {
workflowTaskRelationLog.setPreTaskCode(taskCodeMap.get(workflowTaskRelationLog.getPreTaskCode()));
}
if (taskCodeMap.containsKey(workflowTaskRelationLog.getPostTaskCode())) {
workflowTaskRelationLog.setPostTaskCode(taskCodeMap.get(workflowTaskRelationLog.getPostTaskCode()));
}
workflowTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
workflowTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
taskRelationLogList.add(workflowTaskRelationLog);
}
if (StringUtils.isNotEmpty(workflowDefinition.getLocations())
&& JSONUtils.checkJsonValid(workflowDefinition.getLocations())) {
ArrayNode arrayNode = JSONUtils.parseArray(workflowDefinition.getLocations());
ArrayNode newArrayNode = JSONUtils.createArrayNode();
for (int i = 0; i < arrayNode.size(); i++) {
ObjectNode newObjectNode = newArrayNode.addObject();
JsonNode jsonNode = arrayNode.get(i);
Long taskCode = taskCodeMap.get(jsonNode.get("taskCode").asLong());
if (Objects.nonNull(taskCode)) {
newObjectNode.put("taskCode", taskCode);
newObjectNode.set("x", jsonNode.get("x"));
newObjectNode.set("y", jsonNode.get("y"));
}
}
workflowDefinition.setLocations(newArrayNode.toString());
}
workflowDefinition.setCreateTime(new Date());
workflowDefinition.setUpdateTime(new Date());
Map<String, Object> createDagResult =
createDagDefine(loginUser, taskRelationLogList, workflowDefinition, Lists.newArrayList());
if (Status.SUCCESS.equals(createDagResult.get(Constants.STATUS))) {
putMsg(createDagResult, Status.SUCCESS);
} else {
result.putAll(createDagResult);
log.error("Import workflow definition error, projectCode:{}, workflowDefinitionCode:{}.", projectCode,
workflowDefinition.getCode());
throw new ServiceException(Status.IMPORT_WORKFLOW_DEFINE_ERROR);
}
Schedule schedule = dagDataSchedule.getSchedule();
if (null != schedule) {
WorkflowDefinition newWorkflowDefinition =
workflowDefinitionMapper.queryByCode(workflowDefinition.getCode());
schedule.setWorkflowDefinitionCode(newWorkflowDefinition.getCode());
schedule.setId(null);
schedule.setUserId(loginUser.getId());
schedule.setCreateTime(now);
schedule.setUpdateTime(now);
int scheduleInsert = scheduleMapper.insert(schedule);
if (0 == scheduleInsert) {
log.error(
"Import workflow definition error due to save schedule fail, projectCode:{}, workflowDefinitionCode:{}.",
projectCode, workflowDefinition.getCode());
putMsg(result, Status.IMPORT_WORKFLOW_DEFINE_ERROR);
throw new ServiceException(Status.IMPORT_WORKFLOW_DEFINE_ERROR);
}
}
result.put(Constants.DATA_LIST, workflowDefinition);
log.info("Import workflow definition complete, projectCode:{}, workflowDefinitionCode:{}.", projectCode,
workflowDefinition.getCode());
return true;
}