in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java [1988:2089]
public void checkHeartBeats(Instant currentTime) {
LOGGER.debug("Using worker timeout {} for job {}", getWorkerTimeoutSecs(), this.jobMgr.getJobId());
// heartbeat misses are calculated as 3 * heartbeatInterval, pick 1.5 multiplier for this check interval
long missedHeartBeatToleranceSecs = (long) (1.5 * getWorkerTimeoutSecs());
// Allow more time for workers to start
long stuckInSubmitToleranceSecs =
missedHeartBeatToleranceSecs + ConfigurationProvider.getConfig().getWorkerInitTimeoutSecs();
List<JobWorker> workersToResubmit = Lists.newArrayList();
// expire worker resubmit entries
resubmitRateLimiter.expireResubmitRecords(currentTime.toEpochMilli());
// For each stage
for (IMantisStageMetadata stage : mantisJobMetaData.getStageMetadata().values()) {
// For each worker in the stage
for (JobWorker worker : stage.getAllWorkers()) {
IMantisWorkerMetadata workerMeta = worker.getMetadata();
// Job Actor should start retry/resubmit workers once a worker gets allocated (before its allocation
// the retry should be handled by scheduler to avoid retries when the scheduler is still waiting
// for resources).
if (!workerMeta.hasLaunched()) {
Instant acceptedAt = Instant.ofEpochMilli(workerMeta.getAcceptedAt());
this.numWorkerStuckInAccepted.increment();
if(!scheduler.schedulerHandlesAllocationRetries()) {
// worker stuck in accepted and the scheduler will not retry allocation requests, so
// we must resubmit
if (Duration.between(acceptedAt, currentTime).getSeconds() > stuckInSubmitToleranceSecs) {
LOGGER.info("Resubmitting Job {}, Worker {} that has been stuck in accepted state for {}", this.jobMgr.getJobId(),
workerMeta.getWorkerId(), Duration.between(acceptedAt, currentTime).getSeconds());
workersToResubmit.add(worker);
eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(
WARN,
"worker stuck in Accepted state, resubmitting worker",
workerMeta.getStageNum(),
workerMeta.getWorkerId(),
workerMeta.getState()));
}
} else {
// the worker is still waiting for resource allocation and the scheduler should take care of
// the retry logic.
LOGGER.warn("Job {}, Worker {} stuck in accepted state since {}, pending scheduler retry",
this.jobMgr.getJobId(),
workerMeta.getWorkerId(),
acceptedAt);
}
} else {
// no heartbeat in a timely manner since launched or heartbeat too old
// note: the worker has been launched
boolean noTimelyHeartbeatSinceLaunched = !workerMeta.getLastHeartbeatAt().isPresent()
&& Duration.between(Instant.ofEpochMilli(workerMeta.getLaunchedAt()), currentTime)
.getSeconds() > missedHeartBeatToleranceSecs;
boolean heartbeatTooOld = workerMeta.getLastHeartbeatAt().isPresent()
&& Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds() > missedHeartBeatToleranceSecs;
if (noTimelyHeartbeatSinceLaunched || heartbeatTooOld) {
this.numWorkerMissingHeartbeat.increment();
if (!workerMeta.getLastHeartbeatAt().isPresent()) {
LOGGER.warn("Job {}, Worker {} hasn't received heartbeat, threshold {} exceeded",
this.jobMgr.getJobId(),
workerMeta.getWorkerId(),
missedHeartBeatToleranceSecs);
} else {
LOGGER.warn("Job {}, Worker {} Duration between last heartbeat and now {} "
+ "missed heart beat threshold {} exceeded",
this.jobMgr.getJobId(),
workerMeta.getWorkerId(),
Duration.between(
workerMeta.getLastHeartbeatAt().get(),
currentTime).getSeconds(), missedHeartBeatToleranceSecs);
}
if (ConfigurationProvider.getConfig().isHeartbeatTerminationEnabled()) {
eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(WARN,
"heartbeat too old, resubmitting worker", workerMeta.getStageNum(),
workerMeta.getWorkerId(), workerMeta.getState()));
workersToResubmit.add(worker);
} else {
LOGGER.warn(
"Heart beat based termination is disabled. Skipping termination of "
+ "worker {} Please see mantis.worker.heartbeat.termination.enabled",
workerMeta);
}
}
}
}
}
for (JobWorker worker : workersToResubmit) {
try {
resubmitWorker(worker);
} catch (Exception e) {
LOGGER.warn(
"Exception {} occurred resubmitting Worker {}",
e.getMessage(),
worker.getMetadata(),
e);
}
}
migrateDisabledVmWorkers(currentTime);
}