in src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobInfoService.java [299:410]
public List<ExecutableStepResponse> getJobDetail(String project, String jobId) {
aclEvaluate.checkProjectOperationPermission(project);
ExecutablePO executablePO = jobInfoDao.getExecutablePOByUuid(jobId);
if (executablePO == null) {
throw new KylinException(JOB_NOT_EXIST, jobId);
}
AbstractExecutable executable = null;
ExecutableManager executableManager = getManager(ExecutableManager.class, project);
try {
executable = executableManager.fromPO(executablePO);
} catch (Exception e) {
logger.error(PARSE_ERROR_MSG, e);
return Collections.emptyList();
}
// waite time in output
Map<String, String> waiteTimeMap;
val output = executable.getOutput(executablePO);
try (SetLogCategory ignored = new SetLogCategory(LogConstant.BUILD_CATEGORY)) {
waiteTimeMap = JsonUtil.readValueAsMap(output.getExtra().getOrDefault(NBatchConstants.P_WAITE_TIME, "{}"));
} catch (IOException e) {
logger.error(e.getMessage(), e);
waiteTimeMap = Maps.newHashMap();
}
final String targetSubject = executable.getTargetSubject();
List<ExecutableStepResponse> executableStepList = new ArrayList<>();
List<? extends AbstractExecutable> tasks = ((ChainedExecutable) executable).getTasks();
for (AbstractExecutable task : tasks) {
final ExecutableStepResponse executableStepResponse = parseToExecutableStep(task, executablePO,
waiteTimeMap, output.getState());
if (task.getStatusInMem() == ExecutableState.ERROR) {
executableStepResponse.setFailedStepId(output.getFailedStepId());
executableStepResponse.setFailedSegmentId(output.getFailedSegmentId());
executableStepResponse.setFailedStack(output.getFailedStack());
executableStepResponse.setFailedStepName(task.getName());
setExceptionResolveAndCodeAndReason(output, executableStepResponse);
}
if (executable.getJobSchedulerMode() == JobSchedulerModeEnum.DAG
&& task.getStatus() == ExecutableState.ERROR
&& !org.apache.commons.lang3.StringUtils.startsWith(output.getFailedStepId(), task.getId())) {
executableStepResponse.setStatus(JobStatusEnum.STOPPED);
}
if (task instanceof ChainedStageExecutable) {
Map<String, List<StageExecutable>> stagesMap = Optional
.ofNullable(((ChainedStageExecutable) task).getStagesMap()).orElse(Maps.newHashMap());
Map<String, ExecutableStepResponse.SubStages> stringSubStageMap = Maps.newHashMap();
List<ExecutableStepResponse> subStages = Lists.newArrayList();
for (Map.Entry<String, List<StageExecutable>> entry : stagesMap.entrySet()) {
String segmentId = entry.getKey();
ExecutableStepResponse.SubStages segmentSubStages = new ExecutableStepResponse.SubStages();
List<StageExecutable> stageExecutables = Optional.ofNullable(entry.getValue())
.orElse(Lists.newArrayList());
List<ExecutableStepResponse> stageResponses = Lists.newArrayList();
for (StageExecutable stage : stageExecutables) {
val stageResponse = parseStageToExecutableStep(task, stage,
executableManager.getOutput(stage.getId(), segmentId));
if (executable.getJobSchedulerMode() == JobSchedulerModeEnum.DAG
&& stage.getStatus(segmentId) == ExecutableState.ERROR
&& !org.apache.commons.lang3.StringUtils.startsWith(output.getFailedStepId(),
stage.getId())) {
stageResponse.setStatus(JobStatusEnum.STOPPED);
}
setStage(subStages, stageResponse);
stageResponses.add(stageResponse);
if (StringUtils.equals(output.getFailedStepId(), stage.getId())) {
executableStepResponse.setFailedStepName(stage.getName());
}
}
// table sampling and snapshot table don't have some segment
if (!StringUtils.equals(task.getId(), segmentId)) {
setSegmentSubStageParams(project, targetSubject, task, segmentId, segmentSubStages,
stageExecutables,
stageResponses, waiteTimeMap, output.getState(), executablePO);
stringSubStageMap.put(segmentId, segmentSubStages);
}
}
if (MapUtils.isNotEmpty(stringSubStageMap)) {
executableStepResponse.setSegmentSubStages(stringSubStageMap);
}
if (CollectionUtils.isNotEmpty(subStages)) {
executableStepResponse.setSubStages(subStages);
if (MapUtils.isEmpty(stringSubStageMap) || stringSubStageMap.size() == 1) {
val taskDuration = subStages.stream() //
.map(ExecutableStepResponse::getDuration) //
.mapToLong(Long::valueOf).sum();
executableStepResponse.setDuration(taskDuration);
}
}
}
executableStepList.add(executableStepResponse);
}
if (executable.getStatusInMem() == ExecutableState.DISCARDED) {
executableStepList.forEach(executableStepResponse -> {
executableStepResponse.setStatus(JobStatusEnum.DISCARDED);
Optional.ofNullable(executableStepResponse.getSubStages()).orElse(Lists.newArrayList())
.forEach(subtask -> subtask.setStatus(JobStatusEnum.DISCARDED));
val subStageMap = //
Optional.ofNullable(executableStepResponse.getSegmentSubStages()).orElse(Maps.newHashMap());
for (Map.Entry<String, ExecutableStepResponse.SubStages> entry : subStageMap.entrySet()) {
entry.getValue().getStage().forEach(stage -> stage.setStatus(JobStatusEnum.DISCARDED));
}
});
}
return executableStepList;
}