in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java [1217:1257]
private Observable<MantisJobMetadataView> getFilteredTerminalJobList(ListJobCriteria request, Set<JobId> jobIdSet) {
if(logger.isTraceEnabled()) { logger.trace("JobClusterActor:getFilteredTerminalJobList"); }
if((request.getJobState().isPresent() && !request.getJobState().get().equals(JobState.MetaState.Terminal))) {
if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:getFilteredTerminalJobList with empty"); }
return Observable.empty();
} else if(!request.getJobState().isPresent() && (request.getActiveOnly().isPresent() && request.getActiveOnly().get())) {
if(logger.isTraceEnabled()) { logger.trace("Exit JobClusterActor:getFilteredTerminalJobList with empty"); }
return Observable.empty();
}
List<CompletedJob> jobInfoList;
if(!jobIdSet.isEmpty()) {
jobInfoList = jobIdSet.stream().map((jId) -> jobManager.getCompletedJob(jId))
.filter((compJobOp) -> compJobOp.isPresent()).map((compJobOp) -> compJobOp.get()).collect(Collectors.toList());
} else {
jobInfoList = jobManager.getCompletedJobsList(request.getLimit().orElse(DEFAULT_LIMIT), request.getStartJobIdExclusive().orElse(null));
}
List<CompletedJob> shortenedList = jobInfoList.subList(0, Math.min(jobInfoList.size(), request.getLimit().orElse(DEFAULT_LIMIT)));
return Observable.from(shortenedList)
// terminatedAt comes from completed Job hence the different structure
.flatMap((cJob) -> {
try {
if(logger.isDebugEnabled()) { logger.debug("Fetching details for completed job {}", cJob); }
Optional<IMantisJobMetadata> metaOp = jobManager.getJobDataForCompletedJob(cJob.getJobId());
if(metaOp.isPresent()) {
if(logger.isDebugEnabled()) { logger.debug ("Fetched details for completed job {} -> {}", cJob, metaOp.get()); }
return Observable.just(new MantisJobMetadataView(metaOp.get(),cJob.getTerminatedAt(), request.getStageNumberList(),
request.getWorkerIndexList(), request.getWorkerNumberList(), request.getWorkerStateList(),false));
}
} catch(Exception e) {
logger.error("caught exception", e);
return Observable.empty();
}
return Observable.empty();
});
}