in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java [2161:2269]
public void processEvent(WorkerEvent event, JobState jobState) {
try {
Optional<IMantisStageMetadata> stageMetaOp = getStageForWorker(event);
if (!stageMetaOp.isPresent()) {
terminateUnknownWorkerIfNonTerminal(event);
return;
}
// If worker cannot be scheduled currently, then put it back on the queue with delay and don't update
// its state
if (event instanceof WorkerUnscheduleable) {
scheduler.updateWorkerSchedulingReadyTime(
event.getWorkerId(),
resubmitRateLimiter.getWorkerResubmitTime(
event.getWorkerId(),
stageMetaOp.get().getStageNum()));
eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(
LifecycleEventsProto.StatusEvent.StatusEventType.ERROR,
"rate limiting: no resources to fit worker",
((WorkerUnscheduleable) event).getStageNum(), event.getWorkerId(), WorkerState.Accepted));
return;
}
MantisStageMetadataImpl stageMeta = (MantisStageMetadataImpl) stageMetaOp.get();
// Check if stage worker state (worker index -> worker number) is consistent with the worker event.
// TODO: add termination once confirmed the actual corruption scenario.
try {
if (event instanceof WorkerHeartbeat) {
int eventWorkerIndex = event.getWorkerId().getWorkerIndex();
int eventWorkerNum = event.getWorkerId().getWorkerNum();
int currentWorkerNum = stageMeta.getWorkerByIndex(eventWorkerIndex).getMetadata().getWorkerNumber();
if (currentWorkerNum > eventWorkerNum) {
// event is from a different worker number on same worker index
LOGGER.error(
"[Corrupted state] StaleWorkerEvent: {}, current worker at {}, Terminate stale "
+ "worker",
event.getWorkerId(),
currentWorkerNum);
} else if (currentWorkerNum < eventWorkerNum) {
// this case should not happen as new worker assignment should update state and persist first.
LOGGER.error(
"[Corrupted state] Newer worker num received: {}, Current stage worker: {}",
event,
currentWorkerNum);
}
}
} catch (InvalidJobException ije) {
LOGGER.error("Invalid job error when checking event: {}", event, ije);
}
try {
// Delegate processing of the event to the stage
Optional<JobWorker> workerOp = stageMeta.processWorkerEvent(event, jobStore);
if (!workerOp.isPresent()) {
terminateUnknownWorkerIfNonTerminal(event);
return;
}
IMantisWorkerMetadata wm = workerOp.get().getMetadata();
// If we need to migrate off of disabled VM add it to the queue
if (event instanceof WorkerOnDisabledVM) {
workersToMigrate.add(wm.getWorkerNumber());
return;
}
// Worker transitioned to terminal state resubmit
if (WorkerState.isErrorState(wm.getState()) && !JobState.isTerminalState(jobState)) {
eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(WARN,
"resubmitting lost worker ", wm.getStageNum(),
wm.getWorkerId(), wm.getState()));
recentErrorWorkersCache.put(wm.getWorkerNumber(), true);
resubmitWorker(workerOp.get());
return;
} else if (WorkerState.isTerminalState(wm.getState())) { // worker has explicitly
// completed complete job
jobStore.archiveWorker(wm);
LOGGER.info("Received Worker Complete signal. Wait for all workers to complete before "
+ "terminating Job {}", jobId);
}
if (!(event instanceof WorkerHeartbeat)) {
markStageAssignmentsChanged(false);
}
} catch (Exception e) {
LOGGER.warn("Exception saving worker update", e);
}
if (!allWorkersStarted && !JobState.isTerminalState(jobState)) {
if (allWorkerStarted()) {
allWorkersStarted = true;
jobMgr.onAllWorkersStarted();
scheduler.unscheduleJob(jobId.getId());
markStageAssignmentsChanged(true);
} else if (allWorkerCompleted()) {
LOGGER.info("Job {} All workers completed1", jobId);
allWorkersStarted = false;
jobMgr.onAllWorkersCompleted();
}
} else {
if (allWorkerCompleted()) {
LOGGER.info("Job {} All workers completed", jobId);
allWorkersStarted = false;
jobMgr.onAllWorkersCompleted();
}
}
} catch (Exception e1) {
LOGGER.error("Job {} Exception occurred in process worker event ", jobId, e1);
}
}