private Route getJobClusterRoutes()

in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobClusterRoute.java [312:597]


    private Route getJobClusterRoutes() {
        return route(
            path(segment("api").slash("submit"), () ->
                decodeRequest(() ->
                    entity(Unmarshaller.entityToString(), request -> {
                        logger.debug("/api/submit called {}", request);
                        try {
                            MantisJobDefinition mjd = Jackson.fromJSON(request, MantisJobDefinition.class);
                            logger.debug("job submit request {}", mjd);
                            mjd.validate(true);

                            Pair<Boolean, String> validationResult = validateSubmitJobRequest(mjd);
                            if (!validationResult.first()) {
                                jobClusterSubmitError.increment();
                                return complete(StatusCodes.BAD_REQUEST,
                                    "{\"error\": \"" + validationResult.second() + "\"}");
                            }
                            jobClusterSubmit.increment();
                            return completeWithFuture(
                                jobClusterRouteHandler.submit(JobClusterProtoAdapter.toSubmitJobClusterRequest(mjd))
                                    .thenApply(this::toHttpResponse));
                        } catch (Exception e) {
                            logger.warn("exception in submit job request {}", request, e);
                            jobClusterSubmitError.increment();
                            return complete(StatusCodes.INTERNAL_SERVER_ERROR,
                                "{\"error\": \""+e.getMessage()+ "\"}");
                        }
                    })
                )
            ),
            pathPrefix(API_V0_JOBCLUSTER, () -> route(
                post(() -> route(
                    path("create", () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), jobClusterDefn -> {
                                logger.debug("/api/namedjob/create called {}", jobClusterDefn);
                                try {
                                    final NamedJobDefinition namedJobDefinition = Jackson.fromJSON(jobClusterDefn, NamedJobDefinition.class);
                                    if (namedJobDefinition == null ||
                                        namedJobDefinition.getJobDefinition() == null ||
                                        namedJobDefinition.getJobDefinition().getJobJarFileLocation() == null ||
                                        namedJobDefinition.getJobDefinition().getName() == null ||
                                        namedJobDefinition.getJobDefinition().getName().isEmpty()) {
                                        logger.warn("JobCluster create request must include name and URL {}", jobClusterDefn);
                                        return complete(StatusCodes.BAD_REQUEST, "{\"error\": \"Job definition must include name and URL\"}");
                                    }
                                    final CompletionStage<CreateJobClusterResponse> response =
                                        jobClusterRouteHandler.create(
                                            JobClusterProtoAdapter.toCreateJobClusterRequest(namedJobDefinition));
                                    jobClusterCreate.increment();
                                    return completeWithFuture(response
                                        .thenApply(r -> {
                                            if ((r.responseCode == CLIENT_ERROR || r.responseCode == CLIENT_ERROR_CONFLICT)
                                                && r.message.contains("already exists")) {
                                                return new CreateJobClusterResponse(r.requestId, SERVER_ERROR, r.message, r.getJobClusterName());
                                            }
                                            return r;
                                        })
                                        .thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error creating JobCluster {}", jobClusterDefn, e);
                                    jobClusterCreateError.increment();
                                    return complete(StatusCodes.BAD_REQUEST, "Can't read valid json in request: "+e.getMessage());
                                } catch (Exception e) {
                                    logger.warn("Error creating JobCluster {}", jobClusterDefn, e);
                                    jobClusterCreateError.increment();
                                    return complete(StatusCodes.INTERNAL_SERVER_ERROR, "{\"error\": "+e.getMessage()+"}");
                                }
                            })
                        )
                    ),
                    path("update", () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), jobClusterDefn -> {
                                logger.debug("/api/namedjob/update called {}", jobClusterDefn);
                                try {
                                    final NamedJobDefinition namedJobDefinition = Jackson.fromJSON(jobClusterDefn, NamedJobDefinition.class);
                                    if (namedJobDefinition == null ||
                                        namedJobDefinition.getJobDefinition() == null ||
                                        namedJobDefinition.getJobDefinition().getJobJarFileLocation() == null ||
                                        namedJobDefinition.getJobDefinition().getName() == null ||
                                        namedJobDefinition.getJobDefinition().getName().isEmpty()) {
                                        logger.warn("JobCluster update request must include name and URL {}", jobClusterDefn);
                                        jobClusterCreateUpdateError.increment();
                                        return complete(StatusCodes.BAD_REQUEST, "{\"error\": \"Job definition must include name and URL\"}");
                                    }
                                    final CompletionStage<UpdateJobClusterResponse> response =
                                        jobClusterRouteHandler.update(
                                            JobClusterProtoAdapter.toUpdateJobClusterRequest(namedJobDefinition));
                                    jobClusterCreateUpdate.increment();
                                    return completeWithFuture(response.thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error updating JobCluster {}", jobClusterDefn, e);
                                    jobClusterCreateUpdateError.increment();
                                    return complete(StatusCodes.BAD_REQUEST, "Can't read valid json in request: "+e.getMessage());
                                } catch (Exception e) {
                                    logger.warn("Error updating JobCluster {}", jobClusterDefn, e);
                                    jobClusterCreateUpdateError.increment();
                                    return complete(StatusCodes.INTERNAL_SERVER_ERROR, "{\"error\": "+e.getMessage()+"}");
                                }
                            })
                        )
                    ),
                    path("delete", () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), deleteReq -> {
                                logger.debug("/api/namedjob/delete called {}", deleteReq);
                                try {
                                    final DeleteJobClusterRequest deleteJobClusterRequest = Jackson.fromJSON(deleteReq, DeleteJobClusterRequest.class);
                                    final CompletionStage<DeleteJobClusterResponse> response =
                                        jobClusterRouteHandler.delete(deleteJobClusterRequest);
                                    jobClusterDelete.increment();
                                    return completeWithFuture(response.thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error deleting JobCluster {}", deleteReq, e);
                                    jobClusterDeleteError.increment();
                                    return complete(StatusCodes.BAD_REQUEST, "Can't find valid json in request: " + e.getMessage());
                                }
                            })
                        )
                    ),
                    path("disable", () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), request -> {
                                logger.debug("/api/namedjob/disable called {}", request);
                                try {
                                    final DisableJobClusterRequest disableJobClusterRequest = Jackson.fromJSON(request, DisableJobClusterRequest.class);
                                    final CompletionStage<DisableJobClusterResponse> response =
                                        jobClusterRouteHandler.disable(disableJobClusterRequest);
                                    jobClusterDisable.increment();
                                    return completeWithFuture(response.thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error disabling JobCluster {}", request, e);
                                    jobClusterDisableError.increment();
                                    return complete(StatusCodes.BAD_REQUEST, "Can't find valid json in request: " + e.getMessage());
                                }
                            })
                        )
                    ),
                    path("enable", () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), request -> {
                                logger.debug("/api/namedjob/enable called {}", request);
                                try {
                                    final EnableJobClusterRequest enableJobClusterRequest = Jackson.fromJSON(request, EnableJobClusterRequest.class);
                                    final CompletionStage<EnableJobClusterResponse> response =
                                        jobClusterRouteHandler.enable(enableJobClusterRequest);
                                    jobClusterEnable.increment();
                                    return completeWithFuture(response.thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error enabling JobCluster {}", request, e);
                                    jobClusterEnableError.increment();
                                    return complete(StatusCodes.BAD_REQUEST, "Can't find valid json in request: " + e.getMessage());
                                }
                            })
                        )
                    ),
                    path("quickupdate", () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), request -> {
                                logger.debug("/api/namedjob/quickupdate called {}", request);
                                try {
                                    final UpdateJobClusterArtifactRequest updateJobClusterArtifactRequest = Jackson.fromJSON(request, UpdateJobClusterArtifactRequest.class);
                                    final CompletionStage<UpdateJobClusterArtifactResponse> response =
                                        jobClusterRouteHandler.updateArtifact(updateJobClusterArtifactRequest);
                                    jobClusterQuickupdate.increment();
                                    return completeWithFuture(response.thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error on quickupdate for JobCluster {}", request, e);
                                    jobClusterQuickupdateError.increment();
                                    return complete(StatusCodes.BAD_REQUEST, "Can't find valid json in request: " + e.getMessage());
                                }
                            })
                        )
                    ),
                    path("updatelabels", () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), request ->  {
                                logger.debug("/api/namedjob/updatelabels called {}", request);
                                try {
                                    final UpdateJobClusterLabelsRequest updateJobClusterLabelsRequest = Jackson.fromJSON(request, UpdateJobClusterLabelsRequest.class);
                                    jobClusterUpdateLabel.increment();
                                    return completeWithFuture(jobClusterRouteHandler.updateLabels(updateJobClusterLabelsRequest)
                                        .thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error updating labels for JobCluster {}", request, e);
                                    jobClusterUpdateLabelError.increment();
                                    return complete(StatusCodes.BAD_REQUEST, "Can't find valid json in request: " + e.getMessage());
                                }
                            })
                        )
                    ),
                    path("updatesla", () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), request -> {
                                logger.debug("/api/namedjob/updatesla called {}", request);
                                jobClusterUpdateSla.increment();
                                try {
                                    final UpdateJobClusterSLARequest updateJobClusterSLARequest = Jackson.fromJSON(request, UpdateJobClusterSLARequest.class);
                                    return completeWithFuture(jobClusterRouteHandler.updateSLA(updateJobClusterSLARequest)
                                        .thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error updating SLA for JobCluster {}", request, e);
                                    jobClusterUpdateSlaError.increment();
                                    return complete(StatusCodes.BAD_REQUEST, "Can't find valid json in request: " + e.getMessage());
                                }
                            })
                        )
                    ),
                    path("migratestrategy", () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), request -> {
                                logger.debug("/api/namedjob/migratestrategy called {}", request);
                                try {
                                    final UpdateJobClusterWorkerMigrationStrategyRequest updateMigrateStrategyReq =
                                        Jackson.fromJSON(request, UpdateJobClusterWorkerMigrationStrategyRequest.class);
                                    return completeWithFuture(jobClusterRouteHandler.updateWorkerMigrateStrategy(updateMigrateStrategyReq)
                                        .thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error updating migrate strategy for JobCluster {}", request, e);
                                    return complete(StatusCodes.BAD_REQUEST, "Can't find valid json in request: " + e.getMessage());
                                }
                            })
                        )
                    ),
                    path("quicksubmit", () ->
                        decodeRequest(() ->
                            entity(Unmarshaller.entityToString(), request -> {
                                logger.debug("/api/namedjob/quicksubmit called {}", request);
                                try {
                                    final JobClusterManagerProto.SubmitJobRequest submitJobRequest = Jackson.fromJSON(request, JobClusterManagerProto.SubmitJobRequest.class);
                                    return completeWithFuture(jobClusterRouteHandler.submit(submitJobRequest)
                                        .thenApply(this::toHttpResponse));
                                } catch (IOException e) {
                                    logger.warn("Error on quick submit for JobCluster {}", request, e);
                                    return complete(StatusCodes.BAD_REQUEST, "Can't find valid json in request: " + e.getMessage());
                                }
                            })
                        )
                    )
                )),
                get(() -> route(
                    pathPrefix("list", () -> route(
                        pathEndOrSingleSlash(() -> {
                            logger.debug("/api/namedjob/list called");
                            jobClusterListGET.increment();
                            return alwaysCache(cache, requestUriKeyer, () ->
                                extractUri(uri -> completeAsync(
                                    jobClusterRouteHandler.getAllJobClusters(new ListJobClustersRequest()),
                                    resp -> completeOK(
                                        resp.getJobClusters()
                                            .stream()
                                            .map(jobClusterMetadataView -> JobClusterProtoAdapter.toJobClusterInfo(jobClusterMetadataView))
                                            .collect(Collectors.toList())
                                            ,
                                        Jackson.marshaller()),
                                    resp -> completeOK(Collections.emptyList(), Jackson.marshaller()))));
                        }),
                        path(PathMatchers.segment(), (jobCluster) -> {
                            if (logger.isDebugEnabled()) {
                                logger.debug("/api/namedjob/list/{} called", jobCluster);
                            }
                            jobClusterListClusterGET.increment();
                            return completeAsync(
                                jobClusterRouteHandler.getJobClusterDetails(new JobClusterManagerProto.GetJobClusterRequest(jobCluster)),
                                resp -> completeOK(
                                    resp.getJobCluster().map(jc -> Arrays.asList(jc)).orElse(Collections.emptyList()),
                                    Jackson.marshaller()),
                                resp -> completeOK(Collections.emptyList(), Jackson.marshaller())
                            );
                        })
                    )),
                    path(segment("listJobIds").slash(PathMatchers.segment()), (jobCluster) -> {
                        logger.debug("/api/namedjob/listJobIds/{} called", jobCluster);
                        jobClusterListJobIdGET.increment();
                        return jobClusterListRoute(jobCluster);
                    }),
                    path("listJobIds", () ->
                    {
                        logger.debug("/api/namedjob/listJobIds called");
                        return complete(StatusCodes.BAD_REQUEST,
                            "Specify the Job cluster name '/api/namedjob/listJobIds/<JobClusterName>' to list the job Ids");
                    })
                )))
            ));
    }