in src/job-service/src/main/java/org/apache/kylin/rest/config/initialize/JobSyncListener.java [184:259]
static JobInfo extractJobInfo(JobFinishedNotifier notifier) {
List<SegRange> segRangeList = Lists.newArrayList();
List<SegmentPartitionsInfo> segmentPartitionsInfoList = Lists.newArrayList();
String project = notifier.getProject();
String subject = notifier.getSubject();
NTableMetadataManager nTableMetadataManager = NTableMetadataManager
.getInstance(KylinConfig.getInstanceFromEnv(), project);
TableDesc tableDesc = nTableMetadataManager.getTableDesc(subject);
if (tableDesc == null) {
Set<String> segmentIds = notifier.getSegmentIds();
NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
NDataflow dataflow = nDataflowManager.getDataflow(subject);
if (dataflow != null && CollectionUtils.isNotEmpty(segmentIds)) {
val model = dataflow.getModel();
val partitionDesc = model.getMultiPartitionDesc();
segmentIds.forEach(id -> {
NDataSegment segment = dataflow.getSegment(id);
if (segment == null) {
return;
}
TimeRange segRange = segment.getTSRange();
segRangeList.add(new SegRange(id, segRange.getStart(), segRange.getEnd()));
if (partitionDesc != null && notifier.getSegmentPartitionsMap().get(id) != null
&& !notifier.getSegmentPartitionsMap().get(id).isEmpty()) {
List<SegmentPartitionResponse> segmentPartitionResponseList = segment
.getMultiPartitions().stream().filter(segmentPartition -> notifier
.getSegmentPartitionsMap().get(id).contains(segmentPartition.getPartitionId()))
.map(partition -> {
val partitionInfo = partitionDesc.getPartitionInfo(partition.getPartitionId());
return new SegmentPartitionResponse(partitionInfo.getId(),
partitionInfo.getValues(), partition.getStatus(),
partition.getLastBuildTime(), partition.getSourceCount(),
partition.getStorageSize());
}).collect(Collectors.toList());
segmentPartitionsInfoList.add(new SegmentPartitionsInfo(id, segmentPartitionResponseList));
}
});
}
}
String errorCode = null;
String suggestion = null;
String msg = null;
String code = null;
String stacktrace = null;
Throwable throwable = notifier.getThrowable();
if (throwable != null) {
Throwable rootCause = Throwables.getRootCause(throwable);
KylinConfig kylinConfig = getOverrideConfig(project);
setLanguage(kylinConfig.getJobCallbackLanguage());
if (rootCause instanceof KylinException) {
msg = rootCause.getLocalizedMessage();
KylinException kylinException = (KylinException) rootCause;
code = kylinException.getCode();
suggestion = kylinException.getSuggestionString();
errorCode = kylinException.getErrorCodeString();
stacktrace = Throwables.getStackTraceAsString(rootCause);
} else {
errorCode = NON_KE_EXCEPTION.getErrorCode().getCode();
msg = NON_KE_EXCEPTION.getCodeMsg();
suggestion = NON_KE_EXCEPTION.getErrorSuggest().getLocalizedString();
code = KylinException.CODE_UNDEFINED;
stacktrace = Throwables.getStackTraceAsString(throwable);
}
}
return JobInfo.builder().jobId(notifier.getJobId()).project(notifier.getProject())
.modelId(tableDesc == null ? notifier.getSubject() : null).segmentIds(notifier.getSegmentIds())
.indexIds(notifier.getLayoutIds()).duration(notifier.getDuration())
.state("SUICIDAL".equalsIgnoreCase(notifier.getJobState()) ? "DISCARDED" : notifier.getJobState())
.jobType(notifier.getJobType()).segRanges(segRangeList)
.segmentPartitionInfoList(segmentPartitionsInfoList)
.snapshotJobInfo(getSnapshotJobInfo(tableDesc, notifier)).startTime(notifier.getStartTime())
.endTime(notifier.getEndTime()).tag(notifier.getTag()).errorCode(errorCode).suggestion(suggestion)
.msg(msg).code(code).stacktrace(stacktrace).build();
}