in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java [330:423]
private void initialize(JobClustersManagerInitialize initMsg) {
ActorRef sender = getSender();
try {
logger.info("In JobClustersManagerActor:initialize");
this.jobListHelperActor = getContext().actorOf(JobListHelperActor.props(), "JobListHelperActor");
getContext().watch(jobListHelperActor);
mantisSchedulerFactory = initMsg.getScheduler();
Map<String, IJobClusterMetadata> jobClusterMap = new HashMap<>();
this.jobClusterInfoManager = new JobClusterInfoManager(jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator);
if (!initMsg.isLoadJobsFromStore()) {
getContext().become(initializedBehavior);
sender.tell(new JobClustersManagerInitializeResponse(initMsg.requestId, SUCCESS, "JobClustersManager successfully inited"), getSelf());
} else {
List<IJobClusterMetadata> jobClusters = jobStore.loadAllJobClusters();
logger.info("Read {} job clusters from storage", jobClusters.size());
List<IMantisJobMetadata> activeJobs = jobStore.loadAllActiveJobs();
logger.info("Read {} jobs from storage", activeJobs.size());
for (IJobClusterMetadata jobClusterMeta : jobClusters) {
String clusterName = jobClusterMeta.getJobClusterDefinition().getName();
jobClusterMap.put(clusterName, jobClusterMeta);
}
Map<String, List<IMantisJobMetadata>> clusterToJobMap = new HashMap<>();
// group jobs by cluster
for (IMantisJobMetadata jobMeta : activeJobs) {
String clusterName = jobMeta.getClusterName();
clusterToJobMap.computeIfAbsent(clusterName, k -> new ArrayList<>()).add(jobMeta);
}
long masterInitTimeoutSecs = ConfigurationProvider.getConfig().getMasterInitTimeoutSecs();
long timeout = ((masterInitTimeoutSecs - 60)) > 0 ? (masterInitTimeoutSecs - 60) : masterInitTimeoutSecs;
Observable.from(jobClusterMap.values())
.filter((jobClusterMeta) -> jobClusterMeta != null && jobClusterMeta.getJobClusterDefinition() != null)
.flatMap((jobClusterMeta) -> {
Duration t = Duration.ofSeconds(timeout);
Optional<JobClusterInfo> jobClusterInfoO = jobClusterInfoManager.createClusterActorAndRegister(jobClusterMeta.getJobClusterDefinition());
if (!jobClusterInfoO.isPresent()) {
logger.info("skipping job cluster {} on bootstrap as actor creating failed", jobClusterMeta.getJobClusterDefinition().getName());
return Observable.empty();
}
JobClusterInfo jobClusterInfo = jobClusterInfoO.get();
List<IMantisJobMetadata> jobList = Lists.newArrayList();
List<IMantisJobMetadata> jList = clusterToJobMap.get(jobClusterMeta.getJobClusterDefinition().getName());
if (jList != null) {
jobList.addAll(jList);
}
List<CompletedJob> completedJobsList = Lists.newArrayList();
JobClusterProto.InitializeJobClusterRequest req = new JobClusterProto.InitializeJobClusterRequest((JobClusterDefinitionImpl) jobClusterMeta.getJobClusterDefinition(),
jobClusterMeta.isDisabled(), jobClusterMeta.getLastJobCount(), jobList,
"system", getSelf(), false);
return jobClusterInfoManager.initializeCluster(jobClusterInfo, req, t);
})
.filter(Objects::nonNull)
.toBlocking()
.subscribe((clusterInit) -> {
logger.info("JobCluster {} inited with code {}", clusterInit.jobClusterName, clusterInit.responseCode);
numJobClusterInitSuccesses.increment();
}, (error) -> {
logger.warn("Exception initializing clusters {}", error.getMessage(), error);
logger.error("JobClusterManagerActor had errors during initialization NOT transitioning to initialized behavior");
// getContext().become(initializedBehavior);
sender.tell(new JobClustersManagerInitializeResponse(initMsg.requestId, SERVER_ERROR, "JobClustersManager inited with errors"), getSelf());
}, () -> {
logger.info("JobClusterManagerActor transitioning to initialized behavior");
getContext().become(initializedBehavior);
sender.tell(new JobClustersManagerInitializeResponse(initMsg.requestId, SUCCESS, "JobClustersManager successfully inited"), getSelf());
});
getTimers().startPeriodicTimer(CHECK_CLUSTERS_TIMER_KEY, new ReconcileJobCluster(), Duration.ofSeconds(checkAgainInSecs));
// kick off loading of archived jobs
logger.info("Kicking off archived job load asynchronously");
jobStore.loadAllArchivedJobsAsync();
}
} catch(Exception e) {
logger.error("caught exception", e);
sender.tell(new JobClustersManagerInitializeResponse(initMsg.requestId, SERVER_ERROR, e.getMessage()), getSelf());
}
logger.info("JobClustersManagerActor:initialize ends");
}