public List getJobDetail()

in src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobInfoService.java [299:410]


    public List<ExecutableStepResponse> getJobDetail(String project, String jobId) {
        aclEvaluate.checkProjectOperationPermission(project);
        ExecutablePO executablePO = jobInfoDao.getExecutablePOByUuid(jobId);
        if (executablePO == null) {
            throw new KylinException(JOB_NOT_EXIST, jobId);
        }
        AbstractExecutable executable = null;
        ExecutableManager executableManager = getManager(ExecutableManager.class, project);
        try {
            executable = executableManager.fromPO(executablePO);
        } catch (Exception e) {
            logger.error(PARSE_ERROR_MSG, e);
            return Collections.emptyList();
        }

        // waite time in output
        Map<String, String> waiteTimeMap;
        val output = executable.getOutput(executablePO);
        try (SetLogCategory ignored = new SetLogCategory(LogConstant.BUILD_CATEGORY)) {
            waiteTimeMap = JsonUtil.readValueAsMap(output.getExtra().getOrDefault(NBatchConstants.P_WAITE_TIME, "{}"));
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
            waiteTimeMap = Maps.newHashMap();
        }
        final String targetSubject = executable.getTargetSubject();
        List<ExecutableStepResponse> executableStepList = new ArrayList<>();
        List<? extends AbstractExecutable> tasks = ((ChainedExecutable) executable).getTasks();
        for (AbstractExecutable task : tasks) {
            final ExecutableStepResponse executableStepResponse = parseToExecutableStep(task, executablePO,
                    waiteTimeMap, output.getState());
            if (task.getStatusInMem() == ExecutableState.ERROR) {
                executableStepResponse.setFailedStepId(output.getFailedStepId());
                executableStepResponse.setFailedSegmentId(output.getFailedSegmentId());
                executableStepResponse.setFailedStack(output.getFailedStack());
                executableStepResponse.setFailedStepName(task.getName());

                setExceptionResolveAndCodeAndReason(output, executableStepResponse);
            }
            if (executable.getJobSchedulerMode() == JobSchedulerModeEnum.DAG
                    && task.getStatus() == ExecutableState.ERROR
                    && !org.apache.commons.lang3.StringUtils.startsWith(output.getFailedStepId(), task.getId())) {
                executableStepResponse.setStatus(JobStatusEnum.STOPPED);
            }
            if (task instanceof ChainedStageExecutable) {
                Map<String, List<StageExecutable>> stagesMap = Optional
                        .ofNullable(((ChainedStageExecutable) task).getStagesMap()).orElse(Maps.newHashMap());

                Map<String, ExecutableStepResponse.SubStages> stringSubStageMap = Maps.newHashMap();
                List<ExecutableStepResponse> subStages = Lists.newArrayList();

                for (Map.Entry<String, List<StageExecutable>> entry : stagesMap.entrySet()) {
                    String segmentId = entry.getKey();
                    ExecutableStepResponse.SubStages segmentSubStages = new ExecutableStepResponse.SubStages();

                    List<StageExecutable> stageExecutables = Optional.ofNullable(entry.getValue())
                            .orElse(Lists.newArrayList());
                    List<ExecutableStepResponse> stageResponses = Lists.newArrayList();
                    for (StageExecutable stage : stageExecutables) {
                        val stageResponse = parseStageToExecutableStep(task, stage,
                                executableManager.getOutput(stage.getId(), segmentId));
                        if (executable.getJobSchedulerMode() == JobSchedulerModeEnum.DAG
                                && stage.getStatus(segmentId) == ExecutableState.ERROR
                                && !org.apache.commons.lang3.StringUtils.startsWith(output.getFailedStepId(),
                                        stage.getId())) {
                            stageResponse.setStatus(JobStatusEnum.STOPPED);
                        }
                        setStage(subStages, stageResponse);
                        stageResponses.add(stageResponse);

                        if (StringUtils.equals(output.getFailedStepId(), stage.getId())) {
                            executableStepResponse.setFailedStepName(stage.getName());
                        }
                    }

                    // table sampling and snapshot table don't have some segment
                    if (!StringUtils.equals(task.getId(), segmentId)) {
                        setSegmentSubStageParams(project, targetSubject, task, segmentId, segmentSubStages,
                                stageExecutables,
                                stageResponses, waiteTimeMap, output.getState(), executablePO);
                        stringSubStageMap.put(segmentId, segmentSubStages);
                    }
                }
                if (MapUtils.isNotEmpty(stringSubStageMap)) {
                    executableStepResponse.setSegmentSubStages(stringSubStageMap);
                }
                if (CollectionUtils.isNotEmpty(subStages)) {
                    executableStepResponse.setSubStages(subStages);
                    if (MapUtils.isEmpty(stringSubStageMap) || stringSubStageMap.size() == 1) {
                        val taskDuration = subStages.stream() //
                                .map(ExecutableStepResponse::getDuration) //
                                .mapToLong(Long::valueOf).sum();
                        executableStepResponse.setDuration(taskDuration);

                    }
                }
            }
            executableStepList.add(executableStepResponse);
        }
        if (executable.getStatusInMem() == ExecutableState.DISCARDED) {
            executableStepList.forEach(executableStepResponse -> {
                executableStepResponse.setStatus(JobStatusEnum.DISCARDED);
                Optional.ofNullable(executableStepResponse.getSubStages()).orElse(Lists.newArrayList())
                        .forEach(subtask -> subtask.setStatus(JobStatusEnum.DISCARDED));
                val subStageMap = //
                        Optional.ofNullable(executableStepResponse.getSegmentSubStages()).orElse(Maps.newHashMap());
                for (Map.Entry<String, ExecutableStepResponse.SubStages> entry : subStageMap.entrySet()) {
                    entry.getValue().getStage().forEach(stage -> stage.setStatus(JobStatusEnum.DISCARDED));
                }
            });
        }
        return executableStepList;
    }