in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java [1853:1983]
public Map<String, Object> viewTree(User loginUser, long projectCode, long code, Integer limit) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_TREE_VIEW);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
WorkflowDefinition workflowDefinition = workflowDefinitionMapper.queryByCode(code);
if (null == workflowDefinition || projectCode != workflowDefinition.getProjectCode()) {
log.error("workflow definition does not exist, code:{}.", code);
putMsg(result, Status.WORKFLOW_DEFINITION_NOT_EXIST, String.valueOf(code));
return result;
}
DAG<Long, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(workflowDefinition);
// nodes that are running
Map<Long, List<TreeViewDto>> runningNodeMap = new ConcurrentHashMap<>();
// nodes that are waiting to run
Map<Long, List<TreeViewDto>> waitingRunningNodeMap = new ConcurrentHashMap<>();
// List of workflow instances
List<WorkflowInstance> workflowInstanceList =
workflowInstanceService.queryByWorkflowDefinitionCode(code, limit);
workflowInstanceList.forEach(workflowInstance -> workflowInstance
.setDuration(
DateUtils.format2Duration(workflowInstance.getStartTime(), workflowInstance.getEndTime())));
List<TaskDefinitionLog> taskDefinitionList = taskDefinitionLogDao.queryByWorkflowDefinitionCodeAndVersion(
workflowDefinition.getCode(), workflowDefinition.getVersion());
Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
if (limit < 0) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
return result;
}
if (limit > workflowInstanceList.size()) {
limit = workflowInstanceList.size();
}
TreeViewDto parentTreeViewDto = new TreeViewDto();
parentTreeViewDto.setName("DAG");
parentTreeViewDto.setType("");
parentTreeViewDto.setCode(0L);
// Specify the workflow definition, because it is a TreeView for a workflow definition
for (int i = limit - 1; i >= 0; i--) {
WorkflowInstance workflowInstance = workflowInstanceList.get(i);
Date endTime = workflowInstance.getEndTime() == null ? new Date() : workflowInstance.getEndTime();
parentTreeViewDto.getInstances()
.add(new Instance(workflowInstance.getId(), workflowInstance.getName(),
workflowInstance.getWorkflowDefinitionCode(),
"", workflowInstance.getState().name(), workflowInstance.getStartTime(), endTime,
workflowInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - workflowInstance.getStartTime().getTime())));
}
List<TreeViewDto> parentTreeViewDtoList = new ArrayList<>();
parentTreeViewDtoList.add(parentTreeViewDto);
// Here is the encapsulation task instance
for (Long startNode : dag.getBeginNode()) {
runningNodeMap.put(startNode, parentTreeViewDtoList);
}
while (!ServerLifeCycleManager.isStopped()) {
Set<Long> postNodeList;
Iterator<Map.Entry<Long, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Long, List<TreeViewDto>> en = iter.next();
Long nodeCode = en.getKey();
parentTreeViewDtoList = en.getValue();
TreeViewDto treeViewDto = new TreeViewDto();
TaskNode taskNode = dag.getNode(nodeCode);
treeViewDto.setType(taskNode.getType());
treeViewDto.setCode(taskNode.getCode());
treeViewDto.setName(taskNode.getName());
// set treeViewDto instances
for (int i = limit - 1; i >= 0; i--) {
WorkflowInstance workflowInstance = workflowInstanceList.get(i);
TaskInstance taskInstance =
taskInstanceMapper.queryByInstanceIdAndCode(workflowInstance.getId(), nodeCode);
if (taskInstance == null) {
treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null"));
} else {
Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
long subWorkflowCode = 0L;
// if workflow is sub workflow, the return sub id, or sub id=0
if (TaskTypeUtils.isSubWorkflowTask(taskInstance.getTaskType())) {
TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode());
subWorkflowCode = Long.parseLong(JSONUtils.parseObject(
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_WORKFLOW_DEFINITION_CODE)
.asText());
}
treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(),
taskInstance.getTaskCode(),
taskInstance.getTaskType(), taskInstance.getState().name(),
taskInstance.getStartTime(), taskInstance.getEndTime(),
taskInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subWorkflowCode));
}
}
for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
pTreeViewDto.getChildren().add(treeViewDto);
}
postNodeList = dag.getSubsequentNodes(nodeCode);
if (CollectionUtils.isNotEmpty(postNodeList)) {
for (Long nextNodeCode : postNodeList) {
List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeCode);
if (CollectionUtils.isEmpty(treeViewDtoList)) {
treeViewDtoList = new ArrayList<>();
}
treeViewDtoList.add(treeViewDto);
waitingRunningNodeMap.put(nextNodeCode, treeViewDtoList);
}
}
runningNodeMap.remove(nodeCode);
}
if (waitingRunningNodeMap.size() == 0) {
break;
} else {
runningNodeMap.putAll(waitingRunningNodeMap);
waitingRunningNodeMap.clear();
}
}
result.put(Constants.DATA_LIST, parentTreeViewDto);
result.put(Constants.STATUS, Status.SUCCESS);
result.put(Constants.MSG, Status.SUCCESS.getMsg());
return result;
}