public void processEvent()

in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java [2161:2269]


        public void processEvent(WorkerEvent event, JobState jobState) {
            try {
                Optional<IMantisStageMetadata> stageMetaOp = getStageForWorker(event);
                if (!stageMetaOp.isPresent()) {
                    terminateUnknownWorkerIfNonTerminal(event);
                    return;
                }
                // If worker cannot be scheduled currently, then put it back on the queue with delay and don't update
                // its state
                if (event instanceof WorkerUnscheduleable) {
                    scheduler.updateWorkerSchedulingReadyTime(
                            event.getWorkerId(),
                            resubmitRateLimiter.getWorkerResubmitTime(
                                    event.getWorkerId(),
                                    stageMetaOp.get().getStageNum()));
                    eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(
                            LifecycleEventsProto.StatusEvent.StatusEventType.ERROR,
                            "rate limiting: no resources to fit worker",
                            ((WorkerUnscheduleable) event).getStageNum(), event.getWorkerId(), WorkerState.Accepted));
                    return;
                }

                MantisStageMetadataImpl stageMeta = (MantisStageMetadataImpl) stageMetaOp.get();

                // Check if stage worker state (worker index -> worker number) is consistent with the worker event.
                // TODO: add termination once confirmed the actual corruption scenario.
                try {
                    if (event instanceof WorkerHeartbeat) {
                        int eventWorkerIndex = event.getWorkerId().getWorkerIndex();
                        int eventWorkerNum = event.getWorkerId().getWorkerNum();
                        int currentWorkerNum = stageMeta.getWorkerByIndex(eventWorkerIndex).getMetadata().getWorkerNumber();
                        if (currentWorkerNum > eventWorkerNum) {
                            // event is from a different worker number on same worker index
                            LOGGER.error(
                                    "[Corrupted state] StaleWorkerEvent: {}, current worker at {}, Terminate stale "
                                            + "worker",
                                    event.getWorkerId(),
                                    currentWorkerNum);
                        } else if (currentWorkerNum < eventWorkerNum) {
                            // this case should not happen as new worker assignment should update state and persist first.
                            LOGGER.error(
                                    "[Corrupted state] Newer worker num received: {}, Current stage worker: {}",
                                    event,
                                    currentWorkerNum);
                        }
                    }
                } catch (InvalidJobException ije) {
                    LOGGER.error("Invalid job error when checking event: {}", event, ije);
                }

                try {
                    // Delegate processing of the event to the stage
                    Optional<JobWorker> workerOp = stageMeta.processWorkerEvent(event, jobStore);
                    if (!workerOp.isPresent()) {
                        terminateUnknownWorkerIfNonTerminal(event);
                        return;
                    }

                    IMantisWorkerMetadata wm = workerOp.get().getMetadata();
                    // If we need to migrate off of disabled VM add it to the queue
                    if (event instanceof WorkerOnDisabledVM) {
                        workersToMigrate.add(wm.getWorkerNumber());
                        return;
                    }

                    // Worker transitioned to terminal state resubmit
                    if (WorkerState.isErrorState(wm.getState()) && !JobState.isTerminalState(jobState)) {
                        eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(WARN,
                                "resubmitting lost worker ", wm.getStageNum(),
                                wm.getWorkerId(), wm.getState()));
                        recentErrorWorkersCache.put(wm.getWorkerNumber(), true);
                        resubmitWorker(workerOp.get());
                        return;
                    } else if (WorkerState.isTerminalState(wm.getState())) { // worker has explicitly
                        // completed complete job
                        jobStore.archiveWorker(wm);
                        LOGGER.info("Received Worker Complete signal. Wait for all workers to complete before "
                                + "terminating Job {}", jobId);
                    }

                    if (!(event instanceof WorkerHeartbeat)) {
                        markStageAssignmentsChanged(false);
                    }
                } catch (Exception e) {
                    LOGGER.warn("Exception saving worker update", e);
                }

                if (!allWorkersStarted && !JobState.isTerminalState(jobState)) {
                    if (allWorkerStarted()) {
                        allWorkersStarted = true;
                        jobMgr.onAllWorkersStarted();
                        scheduler.unscheduleJob(jobId.getId());
                        markStageAssignmentsChanged(true);
                    } else if (allWorkerCompleted()) {
                        LOGGER.info("Job {} All workers completed1", jobId);
                        allWorkersStarted = false;
                        jobMgr.onAllWorkersCompleted();
                    }
                } else {
                    if (allWorkerCompleted()) {
                        LOGGER.info("Job {} All workers completed", jobId);
                        allWorkersStarted = false;
                        jobMgr.onAllWorkersCompleted();
                    }
                }
            } catch (Exception e1) {
                LOGGER.error("Job {} Exception occurred in process worker event ", jobId, e1);
            }
        }