in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/JobClustersRoute.java [382:463]
private Route putJobClusterInstanceRoute(String clusterName) {
return entity(Jackson.unmarshaller(NamedJobDefinition.class), jobClusterDefn -> {
logger.info("PUT /api/v1/jobClusters/{} called {}", clusterName, jobClusterDefn);
CompletionStage<UpdateJobClusterResponse> updateResponse;
try {
CompletableFuture<UpdateJobClusterResponse> resp = new CompletableFuture<>();
final UpdateJobClusterRequest request = JobClusterProtoAdapter
.toUpdateJobClusterRequest(jobClusterDefn);
if (jobClusterDefn.getJobDefinition() == null) {
// if request payload is invalid
resp.complete(
new UpdateJobClusterResponse(
request.requestId,
BaseResponse.ResponseCode.CLIENT_ERROR,
"Invalid request payload."));
updateResponse = resp;
} else if (!clusterName.equals(jobClusterDefn.getJobDefinition().getName())) {
// if cluster name specified in request payload does not match with what specified in
// the endpoint path segment
resp.complete(
new UpdateJobClusterResponse(
request.requestId,
BaseResponse.ResponseCode.CLIENT_ERROR,
String.format(
"Cluster name specified in request payload %s " +
"does not match with what specified in resource path %s",
jobClusterDefn.getJobDefinition().getName(),
clusterName)));
updateResponse = resp;
} else {
// everything look ok so far, process the request!
updateResponse = jobClusterRouteHandler.update(
JobClusterProtoAdapter.toUpdateJobClusterRequest(jobClusterDefn));
}
} catch (IllegalArgumentException ex) {
CompletableFuture<UpdateJobClusterResponse> resp = new CompletableFuture<>();
resp.complete(
new UpdateJobClusterResponse(
0L,
BaseResponse.ResponseCode.CLIENT_ERROR,
"Invalid request payload: " + ex.getMessage()));
updateResponse = resp;
}
CompletionStage<GetJobClusterResponse> response = updateResponse
.thenCompose(t -> {
if (t.responseCode.getValue() >= 200 &&
t.responseCode.getValue() < 300) {
return jobClusterRouteHandler.getJobClusterDetails(
new GetJobClusterRequest(clusterName));
} else {
CompletableFuture<GetJobClusterResponse> responseCompletableFuture = new CompletableFuture<>();
responseCompletableFuture.complete(
new JobClusterManagerProto.GetJobClusterResponse(
t.requestId,
t.responseCode,
t.message,
Optional.empty()));
return responseCompletableFuture;
}
});
return completeAsync(
response,
resp -> {
HttpResponse httpResponse = this.toDefaultHttpResponse(resp);
return complete(
httpResponse.status(),
resp.getJobCluster(),
Jackson.marshaller());
},
HttpRequestMetrics.Endpoints.JOB_CLUSTER_INSTANCE,
HttpRequestMetrics.HttpVerb.PUT);
});
}