private void initialize()

in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java [330:423]


    private void initialize(JobClustersManagerInitialize initMsg) {
        ActorRef sender = getSender();
        try {
            logger.info("In JobClustersManagerActor:initialize");

            this.jobListHelperActor = getContext().actorOf(JobListHelperActor.props(), "JobListHelperActor");
            getContext().watch(jobListHelperActor);

            mantisSchedulerFactory = initMsg.getScheduler();
            Map<String, IJobClusterMetadata> jobClusterMap = new HashMap<>();

            this.jobClusterInfoManager = new JobClusterInfoManager(jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator);

            if (!initMsg.isLoadJobsFromStore()) {
                getContext().become(initializedBehavior);
                sender.tell(new JobClustersManagerInitializeResponse(initMsg.requestId, SUCCESS, "JobClustersManager successfully inited"), getSelf());

            } else {
                List<IJobClusterMetadata> jobClusters = jobStore.loadAllJobClusters();
                logger.info("Read {} job clusters from storage", jobClusters.size());

                List<IMantisJobMetadata> activeJobs = jobStore.loadAllActiveJobs();
                logger.info("Read {} jobs from storage", activeJobs.size());

                for (IJobClusterMetadata jobClusterMeta : jobClusters) {
                    String clusterName = jobClusterMeta.getJobClusterDefinition().getName();
                    jobClusterMap.put(clusterName, jobClusterMeta);
                }

                Map<String, List<IMantisJobMetadata>> clusterToJobMap = new HashMap<>();

                // group jobs by cluster
                for (IMantisJobMetadata jobMeta : activeJobs) {
                    String clusterName = jobMeta.getClusterName();
                    clusterToJobMap.computeIfAbsent(clusterName, k -> new ArrayList<>()).add(jobMeta);
                }

                long masterInitTimeoutSecs = ConfigurationProvider.getConfig().getMasterInitTimeoutSecs();
                long timeout = ((masterInitTimeoutSecs - 60)) > 0 ? (masterInitTimeoutSecs - 60) : masterInitTimeoutSecs;
                Observable.from(jobClusterMap.values())
                        .filter((jobClusterMeta) -> jobClusterMeta != null && jobClusterMeta.getJobClusterDefinition() != null)
                        .flatMap((jobClusterMeta) -> {
                            Duration t = Duration.ofSeconds(timeout);
                            Optional<JobClusterInfo> jobClusterInfoO = jobClusterInfoManager.createClusterActorAndRegister(jobClusterMeta.getJobClusterDefinition());
                            if (!jobClusterInfoO.isPresent()) {
                                logger.info("skipping job cluster {} on bootstrap as actor creating failed", jobClusterMeta.getJobClusterDefinition().getName());
                                return Observable.empty();
                            }
                            JobClusterInfo jobClusterInfo = jobClusterInfoO.get();
                            List<IMantisJobMetadata> jobList = Lists.newArrayList();
                            List<IMantisJobMetadata> jList = clusterToJobMap.get(jobClusterMeta.getJobClusterDefinition().getName());
                            if (jList != null) {
                                jobList.addAll(jList);
                            }

                            List<CompletedJob> completedJobsList = Lists.newArrayList();
                            JobClusterProto.InitializeJobClusterRequest req = new JobClusterProto.InitializeJobClusterRequest((JobClusterDefinitionImpl) jobClusterMeta.getJobClusterDefinition(),
                                jobClusterMeta.isDisabled(), jobClusterMeta.getLastJobCount(), jobList,
                                "system", getSelf(), false);
                            return jobClusterInfoManager.initializeCluster(jobClusterInfo, req, t);


                        })
                        .filter(Objects::nonNull)
                        .toBlocking()
                        .subscribe((clusterInit) -> {
                            logger.info("JobCluster {} inited with code {}", clusterInit.jobClusterName, clusterInit.responseCode);
                            numJobClusterInitSuccesses.increment();
                        }, (error) -> {
                            logger.warn("Exception initializing clusters {}", error.getMessage(), error);

                            logger.error("JobClusterManagerActor had errors during initialization NOT transitioning to initialized behavior");
                          //  getContext().become(initializedBehavior);
                            sender.tell(new JobClustersManagerInitializeResponse(initMsg.requestId, SERVER_ERROR, "JobClustersManager  inited with errors"), getSelf());

                        }, () -> {
                            logger.info("JobClusterManagerActor transitioning to initialized behavior");
                            getContext().become(initializedBehavior);
                            sender.tell(new JobClustersManagerInitializeResponse(initMsg.requestId, SUCCESS, "JobClustersManager successfully inited"), getSelf());
                        });

                getTimers().startPeriodicTimer(CHECK_CLUSTERS_TIMER_KEY, new ReconcileJobCluster(), Duration.ofSeconds(checkAgainInSecs));
                // kick off loading of archived jobs
                logger.info("Kicking off archived job load asynchronously");
                jobStore.loadAllArchivedJobsAsync();
            }

        } catch(Exception e) {
            logger.error("caught exception", e);
            sender.tell(new JobClustersManagerInitializeResponse(initMsg.requestId, SERVER_ERROR, e.getMessage()), getSelf());
        }

        logger.info("JobClustersManagerActor:initialize ends");
    }