public void checkHeartBeats()

in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java [1988:2089]


        public void checkHeartBeats(Instant currentTime) {

            LOGGER.debug("Using worker timeout {} for job {}", getWorkerTimeoutSecs(), this.jobMgr.getJobId());
            // heartbeat misses are calculated as 3 * heartbeatInterval, pick 1.5 multiplier for this check interval
            long missedHeartBeatToleranceSecs = (long) (1.5 * getWorkerTimeoutSecs());
            // Allow more time for workers to start
            long stuckInSubmitToleranceSecs =
                missedHeartBeatToleranceSecs + ConfigurationProvider.getConfig().getWorkerInitTimeoutSecs();

            List<JobWorker> workersToResubmit = Lists.newArrayList();
            // expire worker resubmit entries
            resubmitRateLimiter.expireResubmitRecords(currentTime.toEpochMilli());
            // For each stage
            for (IMantisStageMetadata stage : mantisJobMetaData.getStageMetadata().values()) {
                // For each worker in the stage
                for (JobWorker worker : stage.getAllWorkers()) {
                    IMantisWorkerMetadata workerMeta = worker.getMetadata();

                    // Job Actor should start retry/resubmit workers once a worker gets allocated (before its allocation
                    // the retry should be handled by scheduler to avoid retries when the scheduler is still waiting
                    // for resources).
                    if (!workerMeta.hasLaunched()) {
                        Instant acceptedAt = Instant.ofEpochMilli(workerMeta.getAcceptedAt());
                        this.numWorkerStuckInAccepted.increment();
                        if(!scheduler.schedulerHandlesAllocationRetries()) {
                            // worker stuck in accepted and the scheduler will not retry allocation requests, so
                            // we must resubmit
                            if (Duration.between(acceptedAt, currentTime).getSeconds() > stuckInSubmitToleranceSecs) {
                                LOGGER.info("Resubmitting Job {}, Worker {} that has been stuck in accepted state for {}", this.jobMgr.getJobId(),
                                    workerMeta.getWorkerId(), Duration.between(acceptedAt, currentTime).getSeconds());
                                workersToResubmit.add(worker);
                                eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(
                                    WARN,
                                    "worker stuck in Accepted state, resubmitting worker",
                                    workerMeta.getStageNum(),
                                    workerMeta.getWorkerId(),
                                    workerMeta.getState()));
                            }
                        } else {
                            // the worker is still waiting for resource allocation and the scheduler should take care of
                            // the retry logic.
                            LOGGER.warn("Job {}, Worker {} stuck in accepted state since {}, pending scheduler retry",
                                this.jobMgr.getJobId(),
                                workerMeta.getWorkerId(),
                                acceptedAt);
                        }
                    } else {
                        // no heartbeat in a timely manner since launched or heartbeat too old
                        // note: the worker has been launched
                        boolean noTimelyHeartbeatSinceLaunched = !workerMeta.getLastHeartbeatAt().isPresent()
                            && Duration.between(Instant.ofEpochMilli(workerMeta.getLaunchedAt()), currentTime)
                                .getSeconds() > missedHeartBeatToleranceSecs;
                        boolean heartbeatTooOld = workerMeta.getLastHeartbeatAt().isPresent()
                            && Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds() > missedHeartBeatToleranceSecs;
                        if (noTimelyHeartbeatSinceLaunched || heartbeatTooOld) {
                            this.numWorkerMissingHeartbeat.increment();

                            if (!workerMeta.getLastHeartbeatAt().isPresent()) {
                                LOGGER.warn("Job {}, Worker {} hasn't received heartbeat, threshold {} exceeded",
                                    this.jobMgr.getJobId(),
                                    workerMeta.getWorkerId(),
                                    missedHeartBeatToleranceSecs);
                            } else {
                                LOGGER.warn("Job {}, Worker {} Duration between last heartbeat and now {} "
                                        + "missed heart beat threshold {} exceeded",
                                    this.jobMgr.getJobId(),
                                    workerMeta.getWorkerId(),
                                    Duration.between(
                                        workerMeta.getLastHeartbeatAt().get(),
                                        currentTime).getSeconds(), missedHeartBeatToleranceSecs);
                            }

                            if (ConfigurationProvider.getConfig().isHeartbeatTerminationEnabled()) {
                                eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(WARN,
                                    "heartbeat too old, resubmitting worker", workerMeta.getStageNum(),
                                    workerMeta.getWorkerId(), workerMeta.getState()));

                                workersToResubmit.add(worker);
                            } else {
                                LOGGER.warn(
                                    "Heart beat based termination is disabled. Skipping termination of "
                                        + "worker {} Please see mantis.worker.heartbeat.termination.enabled",
                                    workerMeta);
                            }
                        }
                    }
                }
            }

            for (JobWorker worker : workersToResubmit) {
                try {
                    resubmitWorker(worker);
                } catch (Exception e) {
                    LOGGER.warn(
                            "Exception {} occurred resubmitting Worker {}",
                            e.getMessage(),
                            worker.getMetadata(),
                            e);
                }
            }
            migrateDisabledVmWorkers(currentTime);
        }