in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobDiscoveryRoute.java [82:235]
private Route getJobDiscoveryRoutes() {
return route(
get(() -> route(
path(segment("assignmentresults").slash(PathMatchers.segment()), (jobId) ->
extractClientIP(clientIp ->
parameterOptional(
StringUnmarshallers.BOOLEAN,
"sendHB",
(sendHeartbeats) -> {
logger.debug(
"/assignmentresults/{} called by {}",
jobId, clientIp);
schedulingInfoStreamGET.increment();
JobClusterManagerProto.GetJobSchedInfoRequest req =
new JobClusterManagerProto.GetJobSchedInfoRequest(
JobId.fromId(jobId).get());
CompletionStage<JobDiscoveryRouteProto.SchedInfoResponse> schedulingInfoRespCS =
jobDiscoveryRouteHandler.schedulingInfoStream(
req,
sendHeartbeats.orElse(false));
return completeAsync(
schedulingInfoRespCS,
r -> {
if (r.responseCode.equals(ResponseCode.CLIENT_ERROR_NOT_FOUND)) {
logger.warn(
"Sched info stream not found for job {}",
jobId);
return complete(
StatusCodes.NOT_FOUND,
"Sched info stream not found for job " +
jobId);
}
Optional<Observable<JobSchedulingInfo>> schedInfoStreamO = r
.getSchedInfoStream();
if (schedInfoStreamO.isPresent()) {
Observable<JobSchedulingInfo> schedulingInfoObs = schedInfoStreamO
.get();
Source<ServerSentEvent, NotUsed> schedInfoSource =
Source.fromPublisher(
RxReactiveStreams.toPublisher(
schedulingInfoObs))
.map(j -> StreamingUtils.from(
j)
.orElse(null))
.filter(sse -> sse !=
null);
return completeOK(
schedInfoSource,
EventStreamMarshalling.toEventStream());
} else {
logger.warn(
"Failed to get sched info stream for job {}",
jobId);
return complete(
StatusCodes.INTERNAL_SERVER_ERROR,
"Failed to get sched info stream for job " +
jobId);
}
});
}))
),
path(segment("namedjobs").slash(PathMatchers.segment()), (jobCluster) ->
parameterOptional(
StringUnmarshallers.BOOLEAN,
"sendHB",
(sendHeartbeats) -> {
logger.debug(
"/namedjobs/{} called",
jobCluster);
jobClusterInfoStreamGET.increment();
JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest req =
new JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest(
jobCluster);
CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> jobClusterInfoRespCS =
jobDiscoveryRouteHandler.lastSubmittedJobIdStream(
req,
sendHeartbeats.orElse(false));
return completeAsync(
jobClusterInfoRespCS,
r -> {
Optional<Observable<JobClusterInfo>> jobClusterInfoO = r
.getJobClusterInfoObs();
if (jobClusterInfoO.isPresent()) {
Observable<JobClusterInfo> jobClusterInfoObs = jobClusterInfoO
.get();
Source<ServerSentEvent, NotUsed> source = Source
.fromPublisher(RxReactiveStreams
.toPublisher(
jobClusterInfoObs))
.map(j -> StreamingUtils.from(j)
.orElse(null))
.filter(sse -> sse != null);
return completeOK(
source,
EventStreamMarshalling.toEventStream());
} else {
logger.warn(
"Failed to get last submitted jobId stream for {}",
jobCluster);
return complete(
StatusCodes.INTERNAL_SERVER_ERROR,
"Failed to get last submitted jobId stream for " +
jobCluster);
}
});
})
),
path(segment("jobScalerRules").slash(PathMatchers.segment()), (jobId) ->
parameterOptional(
StringUnmarshallers.BOOLEAN,
"sendHB",
(sendHeartbeats) -> {
logger.debug(
"/jobScalerRules/{} called",
jobId);
this.jobScalerRulesStreamGET.increment();
JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest req =
new JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest(
JobId.fromId(jobId).orElse(null));
CompletionStage<JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse> jobScalerRulesRespCS =
jobDiscoveryRouteHandler.jobScalerRuleStream(
req,
sendHeartbeats.orElse(false));
return completeAsync(
jobScalerRulesRespCS,
r -> {
Observable<JobScalerRuleInfo> jobScalerRulesInfoObs = r.getScalerRuleObs();
if (jobScalerRulesInfoObs != null) {
Source<ServerSentEvent, NotUsed> source = Source
.fromPublisher(RxReactiveStreams.toPublisher(jobScalerRulesInfoObs))
.map(j -> StreamingUtils.from(j).orElse(null))
.filter(Objects::nonNull);
return completeOK(
source,
EventStreamMarshalling.toEventStream());
} else {
logger.warn(
"Failed to get job scaler rules stream for {}", jobId);
return complete(
StatusCodes.INTERNAL_SERVER_ERROR,
"Failed to get job scaler rules stream for " + jobId);
}
});
})
)
))
);
}