private void initializeRunningWorkers()

in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java [1487:1566]


        private void initializeRunningWorkers() {
            // Scan for the list of all corrupted workers to be resubmitted.
            List<JobWorker> workersToResubmit = markCorruptedWorkers();
            List<IMantisWorkerMetadata> workersToSubmit = new ArrayList<>();

            // publish a refresh before enqueuing tasks to the Scheduler, as there is a potential race between
            // WorkerRegistryV2 getting updated and isWorkerValid being called from SchedulingService loop
            // If worker is not found in the SchedulingService loop, it is considered invalid and prematurely
            // removed from Fenzo state.
            markStageAssignmentsChanged(true);

            for (IMantisStageMetadata stageMeta : mantisJobMetaData.getStageMetadata().values()) {
                Map<Integer, WorkerHost> workerHosts = new HashMap<>();

                for (JobWorker worker : stageMeta.getAllWorkers()) {
                    IMantisWorkerMetadata wm = worker.getMetadata();

                    if (WorkerState.isRunningState(wm.getState())) {
                        // send fake heartbeat
                        try {
                            WorkerEvent fakeHB = new WorkerHeartbeat(new Status(jobId.getId(), stageMeta.getStageNum(),
                                    wm.getWorkerIndex(), wm.getWorkerNumber(), Status.TYPE.HEARTBEAT, "",
                                    MantisJobState.Started, System.currentTimeMillis()));
                            worker.processEvent(fakeHB, jobStore);
                        } catch (InvalidWorkerStateChangeException | IOException e) {
                            LOGGER.error("problem sending initial heartbeat for Job {} during initialization",
                                    worker.getMetadata().getJobId(), e);
                        }

                        workerHosts.put(
                                wm.getWorkerNumber(),
                                new WorkerHost(
                                        wm.getSlave(),
                                        wm.getWorkerIndex(),
                                        wm.getWorkerPorts().getPorts(),
                                        DataFormatAdapter.convertWorkerStateToMantisJobState(wm.getState()),
                                        wm.getWorkerNumber(),
                                        wm.getMetricsPort(),
                                        wm.getCustomPort()));

                        ScheduleRequest scheduleRequest = createSchedulingRequest(wm, empty());

                        scheduler.initializeRunningWorker(scheduleRequest, wm.getSlave(), wm.getSlaveID());
                    } else if (wm.getState().equals(WorkerState.Accepted)) {

                        // If the job is in accepted state, queue all its pending workers at once in a batch request.
                        // This is important when before master failover there were pending batch requests
                        if (batchSchedulingEnabled && JobState.isAcceptedState(mantisJobMetaData.getState())) {
                            workersToSubmit.add(wm);
                        } else {
                            queueTask(wm);
                        }
                    }
                }

                if (stageMeta.getStageNum() > 0) {
                    stageAssignments.put(stageMeta.getStageNum(), new WorkerAssignments(stageMeta.getStageNum(),
                            stageMeta.getNumWorkers(), workerHosts));
                }
            }

            if (JobState.isAcceptedState(mantisJobMetaData.getState()) && !workersToSubmit.isEmpty()) {
                queueTasks(workersToSubmit, empty());
            }

            // publish another update after queuing tasks to Fenzo (in case some workers were marked Started
            // due to the Fake heartbeat in above loop)
            markStageAssignmentsChanged(true);

            // Resubmit workers with missing ports so they can be reassigned new resources.
            for (JobWorker jobWorker : workersToResubmit) {
                LOGGER.warn("discovered workers with missing ports during initialization: {}", jobWorker);
                try {
                    resubmitWorker(jobWorker);
                } catch (Exception e) {
                    LOGGER.warn("Exception resubmitting worker {} during initializeRunningWorkers due to {}",
                            jobWorker, e.getMessage(), e);
                }
            }
        }