private Route getJobRoutes()

in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobRoute.java [190:372]


    private Route getJobRoutes() {
        return route(
            path(STATUS_ENDPOINT, () ->
                post(() ->
                    decodeRequest(() ->
                        entity(Unmarshaller.entityToString(), req -> {
                            if (logger.isDebugEnabled()) {
                                logger.debug("/api/postjobstatus called {}", req);
                            }
                            try {
                                workerHeartbeatStatusPOST.increment();
                                PostJobStatusRequest postJobStatusRequest = Jackson.fromJSON(req, PostJobStatusRequest.class);
                                WorkerEvent workerStatusRequest = createWorkerStatusRequest(postJobStatusRequest);
                                if (workerStatusRequest instanceof WorkerHeartbeat) {
                                    if (!ConfigurationProvider.getConfig().isHeartbeatProcessingEnabled()) {
                                        // skip heartbeat processing
                                        if (logger.isTraceEnabled()) {
                                            logger.trace("skipped heartbeat event {}", workerStatusRequest);
                                        }
                                        workerHeartbeatSkipped.increment();
                                        return complete(StatusCodes.OK);
                                    }
                                }
                                return completeWithFuture(
                                    jobRouteHandler.workerStatus(workerStatusRequest)
                                        .thenApply(this::toHttpResponse));
                            } catch (IOException e) {
                                logger.warn("Error handling job status {}", req, e);
                                return complete(StatusCodes.BAD_REQUEST, "{\"error\": \"invalid JSON payload to post job status\"}");
                            }
                    })
                ))),
            pathPrefix(API_JOBS, () -> route(
                post(() -> route(
                    path(KILL_ENDPOINT, () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), req -> {
                                logger.debug("/api/jobs/kill called {}", req);
                                try {
                                    final KillJobRequest killJobRequest = Jackson.fromJSON(req, KillJobRequest.class);
                                    return completeWithFuture(
                                        jobRouteHandler.kill(killJobRequest)
                                            .thenApply(resp -> {
                                                if (resp.responseCode == BaseResponse.ResponseCode.SUCCESS) {
                                                    return new JobClusterManagerProto.KillJobResponse(resp.requestId, resp.responseCode,
                                                        resp.getState(), "[\""+ resp.getJobId().getId() +" Killed\"]", resp.getJobId(), resp.getUser());
                                                } else if (resp.responseCode == BaseResponse.ResponseCode.CLIENT_ERROR) {
                                                    // for backwards compatibility with old master
                                                    return new JobClusterManagerProto.KillJobResponse(resp.requestId, BaseResponse.ResponseCode.SUCCESS,
                                                        resp.getState(), "[\""+ resp.message +" \"]", resp.getJobId(), resp.getUser());
                                                }
                                                return resp;
                                            })
                                            .thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error on job kill {}", req, e);
                                    return complete(StatusCodes.BAD_REQUEST, "{\"error\": \"invalid json payload to kill job\"}");
                                }
                            })
                    )),
                    path(RESUBMIT_WORKER_ENDPOINT, () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), req -> {
                                logger.debug("/api/jobs/resubmitWorker called {}", req);
                                try {
                                    final ResubmitWorkerRequest resubmitWorkerRequest = Jackson.fromJSON(req, ResubmitWorkerRequest.class);
                                    return completeWithFuture(
                                        jobRouteHandler.resubmitWorker(resubmitWorkerRequest)
                                            .thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error on worker resubmit {}", req, e);
                                    return complete(StatusCodes.BAD_REQUEST, "{\"error\": \"invalid json payload to resubmit worker\"}");
                                }
                            })
                    )),
                    path(SCALE_STAGE_ENDPOINT, () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), req -> {
                            logger.debug("/api/jobs/scaleStage called {}", req);
                            try {
                                ScaleStageRequest scaleStageRequest = Jackson.fromJSON(req, ScaleStageRequest.class);
                                int numWorkers = scaleStageRequest.getNumWorkers();
                                int maxWorkersPerStage = ConfigurationProvider.getConfig().getMaxWorkersPerStage();
                                if (numWorkers > maxWorkersPerStage) {
                                    logger.warn("rejecting ScaleStageRequest {} with invalid num workers", scaleStageRequest);
                                    return complete(StatusCodes.BAD_REQUEST, "{\"error\": \"num workers must be less than " + maxWorkersPerStage + "\"}");
                                }
                                return completeWithFuture(
                                    jobRouteHandler.scaleStage(scaleStageRequest)
                                        .thenApply(this::toHttpResponse));
                            } catch (IOException e) {
                                logger.warn("Error scaling stage {}", req, e);
                                return complete(StatusCodes.BAD_REQUEST,
                                    "{\"error\": \"invalid json payload to scale stage " + e.getMessage() +"\"}");
                            }
                        })
                    ))
// TODO                   path("updateScalingPolicy", () ->
//                        entity(Jackson.unmarshaller(UpdateJobClusterRequest.class), req -> {
//                            logger.info("/api/jobs/kill called {}", req);
//                            return completeWithFuture(
//                                jobRouteHandler.kill(req)
//                                    .thenApply(this::toHttpResponse));
//                        })
//                    )
                )),
                get(() -> route(
                    // Context from old mantis master:
                    // list all jobs activeOnly = true
                    // optional boolean 'compact' query param to return compact job infos if set
                    // For compact,
                    //  - optional 'limit' query param
                    // - optional 'jobState' query param
                    // For non compact,
                    // - optional boolean 'jobIdsOnly' query param to return only the job Ids if set
                    // - optional int 'stageNumber' query param to filter for stage number
                    // - optional int 'workerIndex' query param to filter for worker index
                    // - optional int 'workerNumber' query param to filter for worker number
                    // - optional int 'workerState' query param to filter for worker state


                    // list/all - list all jobs activeOnly=false with above query parameters
                    // list/matching/<regex> - if optional regex param specified, propagate regex
                    //                          else list all jobs activeOnly=false with above query parameters
                    // list/matchinglabels
                    // - optional labels query param
                    // - optional labels.op query param - default value is 'or' if not specified (other possible value is 'and'
                    path(segment("list"), () -> {
                        jobListGET.increment();
                        return jobListRoute(Optional.empty());
                    }),
                    path(segment("list").slash("matchinglabels"), () -> {
                        jobListLabelMatchGET.increment();
                        return jobListRoute(Optional.empty());
                    }),
                    path(segment("list").slash(PathMatchers.segment()), (jobId) -> {
                        logger.debug("/api/jobs/list/{} called", jobId);
                        jobListJobIdGET.increment();
                        return completeAsync(
                            jobRouteHandler.getJobDetails(new JobClusterManagerProto.GetJobDetailsRequest("masterAPI", jobId)),
                            resp -> {
                                Optional<MantisJobMetadataView> mantisJobMetadataView = resp.getJobMetadata()
                                    .map(metaData -> new MantisJobMetadataView(metaData, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), false));
                                return completeOK(mantisJobMetadataView,
                                    Jackson.marshaller());
                            });
                    }),
                    path(segment("list").slash("matching").slash(PathMatchers.segment()), (regex) -> {
                        jobListRegexGET.increment();
                        return jobListRoute(Optional.ofNullable(regex)
                            .filter(r -> !r.isEmpty()));
                    }),
                    path(segment("archived").slash(PathMatchers.segment()), (jobId) ->
                        parameterOptional(StringUnmarshallers.INTEGER, "limit", (limit) -> {
                            jobArchivedWorkersGET.increment();
                            Optional<JobId> jobIdO = JobId.fromId(jobId);
                            if (jobIdO.isPresent()) {
                                ListArchivedWorkersRequest req = new ListArchivedWorkersRequest(jobIdO.get(),
                                    limit.orElse(DEFAULT_LIST_ARCHIVED_WORKERS_LIMIT));
                                return alwaysCache(cache, requestUriKeyer, () ->
                                    extractUri(uri -> completeAsync(
                                    jobRouteHandler.listArchivedWorkers(req),
                                    resp -> {
                                        List<MantisWorkerMetadataWritable> workers = resp.getWorkerMetadata().stream()
                                            .map(wm -> DataFormatAdapter.convertMantisWorkerMetadataToMantisWorkerMetadataWritable(wm))
                                            .collect(Collectors.toList());
                                        return completeOK(workers,
                                            Jackson.marshaller());
                                    })));
                            } else {
                                return complete(StatusCodes.BAD_REQUEST,
                                    "error: 'archived/<jobId>' request must include a valid jobId");
                            }
                        })
                    ),
                    path(segment("archived"), () -> {
                        jobArchivedWorkersGETInvalid.increment();
                        return complete(StatusCodes.BAD_REQUEST,
                        "error: 'archived' Request must include jobId");
                    })
                )))
            ));
    }