in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java [1487:1566]
private void initializeRunningWorkers() {
// Scan for the list of all corrupted workers to be resubmitted.
List<JobWorker> workersToResubmit = markCorruptedWorkers();
List<IMantisWorkerMetadata> workersToSubmit = new ArrayList<>();
// publish a refresh before enqueuing tasks to the Scheduler, as there is a potential race between
// WorkerRegistryV2 getting updated and isWorkerValid being called from SchedulingService loop
// If worker is not found in the SchedulingService loop, it is considered invalid and prematurely
// removed from Fenzo state.
markStageAssignmentsChanged(true);
for (IMantisStageMetadata stageMeta : mantisJobMetaData.getStageMetadata().values()) {
Map<Integer, WorkerHost> workerHosts = new HashMap<>();
for (JobWorker worker : stageMeta.getAllWorkers()) {
IMantisWorkerMetadata wm = worker.getMetadata();
if (WorkerState.isRunningState(wm.getState())) {
// send fake heartbeat
try {
WorkerEvent fakeHB = new WorkerHeartbeat(new Status(jobId.getId(), stageMeta.getStageNum(),
wm.getWorkerIndex(), wm.getWorkerNumber(), Status.TYPE.HEARTBEAT, "",
MantisJobState.Started, System.currentTimeMillis()));
worker.processEvent(fakeHB, jobStore);
} catch (InvalidWorkerStateChangeException | IOException e) {
LOGGER.error("problem sending initial heartbeat for Job {} during initialization",
worker.getMetadata().getJobId(), e);
}
workerHosts.put(
wm.getWorkerNumber(),
new WorkerHost(
wm.getSlave(),
wm.getWorkerIndex(),
wm.getWorkerPorts().getPorts(),
DataFormatAdapter.convertWorkerStateToMantisJobState(wm.getState()),
wm.getWorkerNumber(),
wm.getMetricsPort(),
wm.getCustomPort()));
ScheduleRequest scheduleRequest = createSchedulingRequest(wm, empty());
scheduler.initializeRunningWorker(scheduleRequest, wm.getSlave(), wm.getSlaveID());
} else if (wm.getState().equals(WorkerState.Accepted)) {
// If the job is in accepted state, queue all its pending workers at once in a batch request.
// This is important when before master failover there were pending batch requests
if (batchSchedulingEnabled && JobState.isAcceptedState(mantisJobMetaData.getState())) {
workersToSubmit.add(wm);
} else {
queueTask(wm);
}
}
}
if (stageMeta.getStageNum() > 0) {
stageAssignments.put(stageMeta.getStageNum(), new WorkerAssignments(stageMeta.getStageNum(),
stageMeta.getNumWorkers(), workerHosts));
}
}
if (JobState.isAcceptedState(mantisJobMetaData.getState()) && !workersToSubmit.isEmpty()) {
queueTasks(workersToSubmit, empty());
}
// publish another update after queuing tasks to Fenzo (in case some workers were marked Started
// due to the Fake heartbeat in above loop)
markStageAssignmentsChanged(true);
// Resubmit workers with missing ports so they can be reassigned new resources.
for (JobWorker jobWorker : workersToResubmit) {
LOGGER.warn("discovered workers with missing ports during initialization: {}", jobWorker);
try {
resubmitWorker(jobWorker);
} catch (Exception e) {
LOGGER.warn("Exception resubmitting worker {} during initializeRunningWorkers due to {}",
jobWorker, e.getMessage(), e);
}
}
}