public void onJobClusterInitialize()

in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java [720:854]


    public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest initReq) {
        ActorRef sender = getSender();
        logger.info("In onJobClusterInitialize {}", this.name);
        if (logger.isDebugEnabled()) {
            logger.debug("Init Request {}", initReq);
        }
        jobClusterMetadata = new JobClusterMetadataImpl.Builder()
                .withLastJobCount(initReq.lastJobNumber)
                .withIsDisabled(initReq.isDisabled)
                .withJobClusterDefinition(initReq.jobClusterDefinition)
                .build();
        // create sla enforcer
        slaEnforcer = new SLAEnforcer(jobClusterMetadata.getJobClusterDefinition().getSLA());
        long expireFrequency = ConfigurationProvider.getConfig().getCompletedJobPurgeFrequencySeqs();
        String jobClusterName = jobClusterMetadata.getJobClusterDefinition().getName();

        // init scaler rules
        try {
            IJobClusterScalerRuleData scalerRuleData = jobStore.getJobClusterScalerData(jobClusterName);
            if (scalerRuleData != null) {
                this.jobClusterScalerRuleData = scalerRuleData;
            }
        } catch (IOException iex) {
            logger.error("Failed to load job cluster: {} scaler rules. Fall back to empty rules", this.name, iex);
        }

        // If cluster is disabled
        if(jobClusterMetadata.isDisabled()) {
            logger.info("Cluster {} initialized but is Disabled", jobClusterMetadata
                    .getJobClusterDefinition().getName());
            try {
                jobManager.initialize();
            } catch (Exception e) {
                sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, CLIENT_ERROR,
                    String.format("JobCluster %s initialization failed",
                        initReq.jobClusterDefinition.getName()),
                    initReq.jobClusterDefinition.getName(), initReq.requestor), getSelf());
            }

            int count = 50;
            if(!initReq.jobList.isEmpty()) {
                logger.info("Cluster {} is disabled however it has {} active/accepted jobs",
                    jobClusterName, initReq.jobList.size());
                for(IMantisJobMetadata jobMeta : initReq.jobList) {
                    try {
                        if(count == 0) {
                            logger.info("Max cleanup limit of 50 reached abort");
                            break;
                        }
                        if(!JobState.isTerminalState(jobMeta.getState())) {
                            logger.info("Job {} is in non terminal state {} for disabled cluster {}."
                                    + "Marking it complete", jobMeta.getJobId(), jobMeta.getState(),
                                jobClusterName);
                            count--;
                            jobManager.markCompleted(jobMeta);
                            jobStore.archiveJob(jobMeta);
                        }
                    } catch (Exception e) {
                        logger.error("Exception {} archiving job {} during init ",e.getMessage(), jobMeta.getJobId(), e);
                    }
                }


            }

            sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, SUCCESS,
                    String.format("JobCluster %s initialized successfully. But is currently disabled",
                            initReq.jobClusterDefinition.getName()),initReq.jobClusterDefinition.getName(),
                    initReq.requestor), getSelf());
            logger.info("Job expiry check frequency set to {}", expireFrequency);

            getContext().become(disabledBehavior);

            return;
        } else {
            // new cluster initialization
            if (initReq.createInStore) {
                try {
                    jobStore.createJobCluster(jobClusterMetadata);
                    // TODO (p1): support scale rules creation during job cluster creation.
                    eventPublisher.publishAuditEvent(
                            new LifecycleEventsProto.AuditEvent(
                                    LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_CREATE,
                                jobClusterName,
                                    "saved job cluster " + name)
                    );
                    logger.info("successfully saved job cluster {}", name);
                    numJobClustersInitialized.increment();

                } catch (final JobClusterAlreadyExistsException exists) {
                    numJobClusterInitializeFailures.increment();
                    logger.error("job cluster not created");
                    sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, CLIENT_ERROR,
                            String.format("JobCluster %s already exists",
                                    initReq.jobClusterDefinition.getName()),
                            initReq.jobClusterDefinition.getName(), initReq.requestor), getSelf());
                    // TODO: handle case when job cluster exists in store but Job cluster actor is not running
                    return;
                } catch (final Exception e) {
                    numJobClusterInitializeFailures.increment();
                    logger.error("job cluster not created due to {}", e.getMessage(), e);
                    sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId,
                            SERVER_ERROR, String.format("JobCluster %s not created due to %s",
                            initReq.jobClusterDefinition.getName(), Throwables.getStackTraceAsString(e)),
                            initReq.jobClusterDefinition.getName(), initReq.requestor), getSelf());
                    // TODO: send PoisonPill to self if job cluster was not created ? Return'ing for now,
                    //  so we don't send back 2 InitJobClusterResponses
                    return;
                }
            }

            try {
                cronManager = new CronManager(name, getSelf(), jobClusterMetadata.getJobClusterDefinition().getSLA());
            } catch (Exception e) {
                logger.warn("Exception initializing cron", e);
                getSender().tell(new JobClusterManagerProto.CreateJobClusterResponse(
                    initReq.requestId, e instanceof SchedulerException?CLIENT_ERROR:SERVER_ERROR,
                    "Job Cluster " + jobClusterName + " could not be created due to cron initialization error" + e.getMessage(),
                    jobClusterName), getSelf());
                return;
            }
            initRunningJobs(initReq, sender);

            logger.info("Job expiry check frequency set to {}", expireFrequency);
            try {
                jobManager.initialize();
            } catch (Exception e) {
                sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, CLIENT_ERROR,
                    String.format("JobCluster %s initialization failed",
                        initReq.jobClusterDefinition.getName()),
                    initReq.jobClusterDefinition.getName(), initReq.requestor), getSelf());
            }
        }

    }