in genie-web/src/main/java/com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcJobKillServiceImpl.java [117:218]
public void killJob(
final String jobId,
final String reason,
@Nullable final HttpServletRequest request
) throws GenieJobNotFoundException, GenieServerException {
final JobStatus currentJobStatus;
try {
currentJobStatus = this.persistenceService.getJobStatus(jobId);
} catch (final NotFoundException e) {
throw new GenieJobNotFoundException(e);
}
if (currentJobStatus.isFinished()) {
log.info("Job {} was already finished when the kill request arrived. Nothing to do.", jobId);
} else if (JobStatus.getStatusesBeforeClaimed().contains(currentJobStatus)) {
// Agent hasn't come up and claimed job yet. Setting to killed should prevent agent from starting
try {
this.persistenceService.updateJobStatus(jobId, currentJobStatus, JobStatus.KILLED, reason);
} catch (final GenieInvalidStatusException e) {
// This is the case where somewhere else in the system the status was changed before we could kill
// Should retry entire method as job may have transitioned to a finished state
log.error(
"Unable to set job status for {} to {} due to current status not being expected {}",
jobId,
JobStatus.KILLED,
currentJobStatus
);
throw e;
} catch (final NotFoundException e) {
throw new GenieJobNotFoundException(e);
}
} else if (currentJobStatus.isActive()) {
// If we get here the Job should not currently be regarded as finished AND an agent has come up and
// connected to one of the Genie servers, possibly this one
if (this.agentRoutingService.isAgentConnectionLocal(jobId)) {
// Agent should be connected here so we should have a response observer to use
final StreamObserver<JobKillRegistrationResponse> responseObserver
= this.parkedJobKillResponseObservers.remove(jobId);
if (responseObserver == null) {
// This might happen when the agent has gone but its status is not updated
// In this case, we force updating the job status to KILLED.
log.warn("Tried to kill Job {}, but expected local agent connection not found. "
+ "Trying to force updating the job status to {}",
jobId,
JobStatus.KILLED
);
try {
this.persistenceService.updateJobStatus(jobId, currentJobStatus, JobStatus.KILLED, reason);
log.info("Succeeded to force updating the status of Job {} to {}",
jobId,
JobStatus.KILLED
);
} catch (final GenieInvalidStatusException e) {
log.error(
"Failed to force updating the status of Job {} to {} "
+ "due to current status not being expected {}",
jobId,
JobStatus.KILLED,
currentJobStatus
);
throw e;
} catch (final NotFoundException e) {
log.error(
"Failed to force updating the status of Job {} to {} due to job not found",
jobId,
JobStatus.KILLED
);
throw new GenieJobNotFoundException(e);
}
} else {
responseObserver.onNext(JobKillRegistrationResponse.newBuilder().build());
responseObserver.onCompleted();
log.info("Agent notified for killing job {}", jobId);
}
} else {
// Agent is running somewhere else try to forward the request
final String hostname = this.agentRoutingService
.getHostnameForAgentConnection(jobId)
.orElseThrow(
// Note: this should retry as we may have hit a case where agent is transitioning nodes
// it is connected to
() -> new GenieServerException(
"Unable to locate host where agent is connected for job " + jobId
)
);
log.info(
"Agent for job {} currently connected to {}. Attempting to forward kill request",
jobId,
hostname
);
this.requestForwardingService.kill(hostname, jobId, request);
}
} else {
// The job is in some unknown state throw exception that forces method to try again
log.error("{} is an unhandled state for job {}", currentJobStatus, jobId);
throw new GenieServerException(
"Job " + jobId + " is currently in " + currentJobStatus + " status, which isn't currently handled"
);
}
}