public void run()

in genie-web/src/main/java/com/netflix/genie/web/tasks/leader/AgentJobCleanupTask.java [90:164]


    public void run() {
        // Get agent jobs that in active status
        final Set<String> activeAgentJobIds = this.persistenceService.getActiveJobs();

        // Get agent jobs that in ACCEPTED status (i.e. waiting for agent to start)
        final Set<String> acceptedAgentJobIds = this.persistenceService.getUnclaimedJobs();

        // Filter out jobs whose agent is connected
        final Set<String> currentlyAwolJobsIds = activeAgentJobIds
            .stream()
            .filter(jobId -> !this.agentRoutingService.isAgentConnected(jobId))
            .collect(Collectors.toSet());

        // Purge records if corresponding agent is now connected
        this.awolJobsMap.entrySet().removeIf(awolJobEntry -> !currentlyAwolJobsIds.contains(awolJobEntry.getKey()));

        final Instant now = Instant.now();

        // Add records for any agent that was not previously AWOL
        currentlyAwolJobsIds.forEach(jobId -> this.awolJobsMap.putIfAbsent(jobId, now));

        // Iterate over jobs whose agent is currently AWOL
        for (final Map.Entry<String, Instant> entry : this.awolJobsMap.entrySet()) {
            final String awolJobId = entry.getKey();
            final Instant awolJobFirstSeen = entry.getValue();

            final boolean jobWasClaimed = !acceptedAgentJobIds.contains(awolJobId);
            final Instant claimDeadline = awolJobFirstSeen.plus(this.properties.getLaunchTimeLimit());
            final Instant reconnectDeadline = awolJobFirstSeen.plus(this.properties.getReconnectTimeLimit());

            if (!jobWasClaimed && now.isBefore(claimDeadline)) {
                log.debug("Job {} agent still pending agent start/claim", awolJobId);
            } else if (jobWasClaimed && now.isBefore(reconnectDeadline)) {
                log.debug("Job {} agent still disconnected", awolJobId);
            } else {
                log.warn("Job {} agent AWOL for too long, marking failed", awolJobId);
                try {
                    final JobStatus currentStatus = this.persistenceService.getJobStatus(awolJobId);
                    final ArchiveStatus archiveStatus = this.persistenceService.getJobArchiveStatus(awolJobId);

                    // Update job archive status
                    if (archiveStatus == ArchiveStatus.PENDING) {
                        this.persistenceService.updateJobArchiveStatus(
                            awolJobId,
                            jobWasClaimed ? ArchiveStatus.UNKNOWN : ArchiveStatus.FAILED
                        );
                    }

                    // Mark the job as failed
                    this.persistenceService.updateJobStatus(
                        awolJobId,
                        currentStatus,
                        JobStatus.FAILED,
                        jobWasClaimed ? AWOL_STATUS_MESSAGE : NEVER_CLAIMED_STATUS_MESSAGE
                    );

                    // If marking as failed succeeded, remove it from the map
                    this.awolJobsMap.remove(awolJobId);

                    // Increment counter, tag as successful
                    this.registry.counter(
                        TERMINATED_COUNTER_METRIC_NAME,
                        MetricsUtils.newSuccessTagsSet()
                    ).increment();
                } catch (NotFoundException | GenieInvalidStatusException e) {
                    log.warn("Failed to mark AWOL job {} as failed: ", awolJobId, e);
                    // Increment counter, tag as failure
                    this.registry.counter(
                        TERMINATED_COUNTER_METRIC_NAME,
                        MetricsUtils.newFailureTagsSetForException(e)
                    ).increment();
                }
            }
        }
    }