in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java [1817:1895]
public void onKillJobResponse(JobClusterProto.KillJobResponse resp) {
if(logger.isTraceEnabled()) { logger.trace("Enter onKillJobResponse {}", resp); }
if (resp.responseCode == SUCCESS) {
Optional<JobInfo> jInfo = jobManager.getJobInfoForNonTerminalJob(resp.jobId);
if(jInfo.isPresent() ) {
// stop watching actor
getContext().unwatch(jInfo.get().jobActor);
numJobShutdowns.increment();
logger.info("Marking job {} as terminated", jInfo.get().jobId);
// check requestor is not self to avoid an infinite loop
if (resp.requestor != null && !getSelf().equals(resp.requestor)) {
resp.requestor.tell(
new KillJobResponse(resp.requestId, resp.responseCode, resp.state, resp.message, resp.jobId, resp.user),
getSelf());
}
try {
Optional<CompletedJob> completedJob = jobManager.markCompleted(resp.jobMetadata);
if(completedJob.isPresent()) {
logger.info("In cleanupAfterJobKill for Job {} in state {} and metadata {} ", resp.jobId, resp.state,resp.jobMetadata);
// enforce SLA
if(!jobClusterMetadata.isDisabled()) {
SLA sla = this.jobClusterMetadata.getJobClusterDefinition().getSLA();
if(sla.getMin() == 0 && sla.getMax() == 0) {
logger.info("{} No SLA specified nothing to enforce {}",
completedJob.get().getJobId(), sla);
} else {
try {
// first check if response has job meta for last job
Optional<IMantisJobMetadata> cJob = Optional.of(resp.jobMetadata);
if (cJob == null || !cJob.isPresent()) {
// else check archived jobs
cJob = jobStore.getArchivedJob(completedJob.get().getJobId());
}
if( cJob != null && cJob.isPresent()) {
getSelf().tell(new JobClusterProto.EnforceSLARequest(Instant.now(), of(cJob.get().getJobDefinition())), ActorRef.noSender());
} else {
logger.warn("Could not load last terminated job to use for triggering enforce SLA");
}
} catch (Exception e) {
// should not get here
logger.warn("Exception {} loading completed Job {} to enforce SLA due", e.getMessage(), completedJob.get().getJobId(), e);
}
}
}
} else {
logger.warn("Unable to mark job {} completed. ", resp.jobId);
}
} catch (IOException e) {
logger.error("Unable to mark job {} completed. ", resp.jobId, e);
}
} else {
// should not get here
if (resp.requestor != null && !getSelf().equals(resp.requestor)) {
resp.requestor.tell(
new KillJobResponse(resp.requestId, CLIENT_ERROR, JobState.Noop, "Job not found", resp.jobId, resp.user),
getSelf());
}
}
} else {
if (resp.requestor != null && !getSelf().equals(resp.requestor)) {
// kill job was not successful relay to caller
resp.requestor.tell(
new KillJobResponse(resp.requestId, resp.responseCode, resp.state, resp.message, resp.jobId, resp.user),
getSelf());
}
}
if(logger.isTraceEnabled()) { logger.trace("Exit onKillJobResponse {}", resp); }
}