private Route getJobDiscoveryRoutes()

in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobDiscoveryRoute.java [82:235]


    private Route getJobDiscoveryRoutes() {
        return route(
                get(() -> route(
                        path(segment("assignmentresults").slash(PathMatchers.segment()), (jobId) ->
                                extractClientIP(clientIp ->
                                    parameterOptional(
                                        StringUnmarshallers.BOOLEAN,
                                        "sendHB",
                                        (sendHeartbeats) -> {
                                            logger.debug(
                                                    "/assignmentresults/{} called by {}",
                                                    jobId, clientIp);
                                            schedulingInfoStreamGET.increment();
                                            JobClusterManagerProto.GetJobSchedInfoRequest req =
                                                    new JobClusterManagerProto.GetJobSchedInfoRequest(
                                                            JobId.fromId(jobId).get());

                                            CompletionStage<JobDiscoveryRouteProto.SchedInfoResponse> schedulingInfoRespCS =
                                                    jobDiscoveryRouteHandler.schedulingInfoStream(
                                                            req,
                                                            sendHeartbeats.orElse(false));

                                            return completeAsync(
                                                    schedulingInfoRespCS,
                                                    r -> {
                                                        if (r.responseCode.equals(ResponseCode.CLIENT_ERROR_NOT_FOUND)) {
                                                            logger.warn(
                                                                "Sched info stream not found for job {}",
                                                                jobId);
                                                            return complete(
                                                                StatusCodes.NOT_FOUND,
                                                                "Sched info stream not found for job " +
                                                                    jobId);
                                                        }

                                                        Optional<Observable<JobSchedulingInfo>> schedInfoStreamO = r
                                                                .getSchedInfoStream();
                                                        if (schedInfoStreamO.isPresent()) {
                                                            Observable<JobSchedulingInfo> schedulingInfoObs = schedInfoStreamO
                                                                    .get();
                                                            Source<ServerSentEvent, NotUsed> schedInfoSource =
                                                                    Source.fromPublisher(
                                                                            RxReactiveStreams.toPublisher(
                                                                                    schedulingInfoObs))
                                                                          .map(j -> StreamingUtils.from(
                                                                                  j)
                                                                                                  .orElse(null))
                                                                          .filter(sse -> sse !=
                                                                                         null);
                                                            return completeOK(
                                                                    schedInfoSource,
                                                                    EventStreamMarshalling.toEventStream());
                                                        } else {
                                                            logger.warn(
                                                                    "Failed to get sched info stream for job {}",
                                                                    jobId);
                                                            return complete(
                                                                    StatusCodes.INTERNAL_SERVER_ERROR,
                                                                    "Failed to get sched info stream for job " +
                                                                    jobId);
                                                        }
                                                    });
                                        }))
                        ),
                        path(segment("namedjobs").slash(PathMatchers.segment()), (jobCluster) ->
                                parameterOptional(
                                        StringUnmarshallers.BOOLEAN,
                                        "sendHB",
                                        (sendHeartbeats) -> {
                                            logger.debug(
                                                    "/namedjobs/{} called",
                                                    jobCluster);
                                            jobClusterInfoStreamGET.increment();
                                            JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest req =
                                                    new JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest(
                                                            jobCluster);

                                            CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> jobClusterInfoRespCS =
                                                    jobDiscoveryRouteHandler.lastSubmittedJobIdStream(
                                                            req,
                                                            sendHeartbeats.orElse(false));
                                            return completeAsync(
                                                    jobClusterInfoRespCS,
                                                    r -> {
                                                        Optional<Observable<JobClusterInfo>> jobClusterInfoO = r
                                                                .getJobClusterInfoObs();
                                                        if (jobClusterInfoO.isPresent()) {
                                                            Observable<JobClusterInfo> jobClusterInfoObs = jobClusterInfoO
                                                                    .get();

                                                            Source<ServerSentEvent, NotUsed> source = Source
                                                                    .fromPublisher(RxReactiveStreams
                                                                                           .toPublisher(
                                                                                                   jobClusterInfoObs))
                                                                    .map(j -> StreamingUtils.from(j)
                                                                                            .orElse(null))
                                                                    .filter(sse -> sse != null);
                                                            return completeOK(
                                                                    source,
                                                                    EventStreamMarshalling.toEventStream());
                                                        } else {
                                                            logger.warn(
                                                                    "Failed to get last submitted jobId stream for {}",
                                                                    jobCluster);
                                                            return complete(
                                                                    StatusCodes.INTERNAL_SERVER_ERROR,
                                                                    "Failed to get last submitted jobId stream for " +
                                                                    jobCluster);
                                                        }
                                                    });
                                        })
                        ),
                        path(segment("jobScalerRules").slash(PathMatchers.segment()), (jobId) ->
                            parameterOptional(
                                StringUnmarshallers.BOOLEAN,
                                "sendHB",
                                (sendHeartbeats) -> {
                                    logger.debug(
                                        "/jobScalerRules/{} called",
                                        jobId);
                                    this.jobScalerRulesStreamGET.increment();
                                    JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest req =
                                        new JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest(
                                            JobId.fromId(jobId).orElse(null));

                                    CompletionStage<JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse> jobScalerRulesRespCS =
                                        jobDiscoveryRouteHandler.jobScalerRuleStream(
                                            req,
                                            sendHeartbeats.orElse(false));
                                    return completeAsync(
                                        jobScalerRulesRespCS,
                                        r -> {
                                            Observable<JobScalerRuleInfo> jobScalerRulesInfoObs = r.getScalerRuleObs();
                                            if (jobScalerRulesInfoObs != null) {
                                                Source<ServerSentEvent, NotUsed> source = Source
                                                    .fromPublisher(RxReactiveStreams.toPublisher(jobScalerRulesInfoObs))
                                                    .map(j -> StreamingUtils.from(j).orElse(null))
                                                    .filter(Objects::nonNull);
                                                return completeOK(
                                                    source,
                                                    EventStreamMarshalling.toEventStream());
                                            } else {
                                                logger.warn(
                                                    "Failed to get job scaler rules stream for {}", jobId);
                                                return complete(
                                                    StatusCodes.INTERNAL_SERVER_ERROR,
                                                    "Failed to get job scaler rules stream for " + jobId);
                                            }
                                        });
                                })
                        )
                ))
        );
    }