in genie-web/src/main/java/com/netflix/genie/web/tasks/leader/AgentJobCleanupTask.java [90:164]
public void run() {
// Get agent jobs that in active status
final Set<String> activeAgentJobIds = this.persistenceService.getActiveJobs();
// Get agent jobs that in ACCEPTED status (i.e. waiting for agent to start)
final Set<String> acceptedAgentJobIds = this.persistenceService.getUnclaimedJobs();
// Filter out jobs whose agent is connected
final Set<String> currentlyAwolJobsIds = activeAgentJobIds
.stream()
.filter(jobId -> !this.agentRoutingService.isAgentConnected(jobId))
.collect(Collectors.toSet());
// Purge records if corresponding agent is now connected
this.awolJobsMap.entrySet().removeIf(awolJobEntry -> !currentlyAwolJobsIds.contains(awolJobEntry.getKey()));
final Instant now = Instant.now();
// Add records for any agent that was not previously AWOL
currentlyAwolJobsIds.forEach(jobId -> this.awolJobsMap.putIfAbsent(jobId, now));
// Iterate over jobs whose agent is currently AWOL
for (final Map.Entry<String, Instant> entry : this.awolJobsMap.entrySet()) {
final String awolJobId = entry.getKey();
final Instant awolJobFirstSeen = entry.getValue();
final boolean jobWasClaimed = !acceptedAgentJobIds.contains(awolJobId);
final Instant claimDeadline = awolJobFirstSeen.plus(this.properties.getLaunchTimeLimit());
final Instant reconnectDeadline = awolJobFirstSeen.plus(this.properties.getReconnectTimeLimit());
if (!jobWasClaimed && now.isBefore(claimDeadline)) {
log.debug("Job {} agent still pending agent start/claim", awolJobId);
} else if (jobWasClaimed && now.isBefore(reconnectDeadline)) {
log.debug("Job {} agent still disconnected", awolJobId);
} else {
log.warn("Job {} agent AWOL for too long, marking failed", awolJobId);
try {
final JobStatus currentStatus = this.persistenceService.getJobStatus(awolJobId);
final ArchiveStatus archiveStatus = this.persistenceService.getJobArchiveStatus(awolJobId);
// Update job archive status
if (archiveStatus == ArchiveStatus.PENDING) {
this.persistenceService.updateJobArchiveStatus(
awolJobId,
jobWasClaimed ? ArchiveStatus.UNKNOWN : ArchiveStatus.FAILED
);
}
// Mark the job as failed
this.persistenceService.updateJobStatus(
awolJobId,
currentStatus,
JobStatus.FAILED,
jobWasClaimed ? AWOL_STATUS_MESSAGE : NEVER_CLAIMED_STATUS_MESSAGE
);
// If marking as failed succeeded, remove it from the map
this.awolJobsMap.remove(awolJobId);
// Increment counter, tag as successful
this.registry.counter(
TERMINATED_COUNTER_METRIC_NAME,
MetricsUtils.newSuccessTagsSet()
).increment();
} catch (NotFoundException | GenieInvalidStatusException e) {
log.warn("Failed to mark AWOL job {} as failed: ", awolJobId, e);
// Increment counter, tag as failure
this.registry.counter(
TERMINATED_COUNTER_METRIC_NAME,
MetricsUtils.newFailureTagsSetForException(e)
).increment();
}
}
}
}