private void startScalerService()

in mantis-jm-akka/src/main/java/io/mantisrx/server/worker/jobmaster/akka/rules/ScalerControllerActor.java [147:255]


    private void startScalerService(CoordinatorActor.ActivateRuleRequest activateScalerRequest) {
        log.info("[EVENT ACTIVATE RULE] req is higher ranking from current: {}, activating rule: {}",
            Optional.ofNullable(this.activeRule).map(JobScalingRule::getRuleId).orElse("null"),
            activateScalerRequest.getRule().getRuleId());
        final JobAutoScalerService currentService = this.activeJobAutoScalerService;
        final JobScalingRule currentRule = this.activeRule;

        this.activeRule = activateScalerRequest.getRule();

        if (this.activeRule.getScalerConfig().getStageConfigMap().entrySet().stream()
            .anyMatch(entry ->
                entry.getValue() != null && entry.getValue().getScalingPolicy() != null)) {
            log.info("Creating Job Auto Scaler service for rule: {}", activateScalerRequest.getRule().getRuleId());
            this.activeJobAutoScalerService = this.jobScalerContext.getJobAutoScalerServiceFactory()
                .apply(this.jobScalerContext, activateScalerRequest.getRule());
        } else {
            // the rule only requested desire size but no scaling policy, no need to create service.
            log.info("No Job Auto Scaler service required for rule: {}", activateScalerRequest.getRule().getRuleId());
            this.activeJobAutoScalerService = null;
        }

        final JobAutoScalerService newService = this.activeJobAutoScalerService;
        final JobScalingRule newRule = this.activeRule;

        log.info("closing current service {} for rule: {}, starting new service {} for rule {}",
            currentService,
            Optional.ofNullable(currentRule).map(JobScalingRule::getRuleId),
            newService,
            newRule.getRuleId());
        // Run start service in scaler executor,
        // DO NOT mutate actor state in scaler executor!
        Future<String> startServiceFuture = future(() -> {
            // shutdown current service if exists, ignore shutdown error.
            if (currentService != null) {
                log.info("Stopping current Job Auto Scaler service for rule: {}", currentRule.getRuleId());
                try {
                    currentService.shutdown();
                } catch (Exception ex) {
                    log.error("failed to stop current job auto scaler service", ex);
                    throw new RuntimeException(ex);
                }
            }

            // first handle stage desire size
            for (Map.Entry<String, JobScalingRule.StageScalerConfig> kv :
                newRule.getScalerConfig().getStageConfigMap().entrySet()) {
                if (kv.getValue() == null || kv.getValue().getDesireSize() == null || kv.getValue().getDesireSize() < 0) {
                    log.info("No valid desire size for stage: {}, ignore", kv.getKey());
                    continue;
                }

                log.info("Start scaling stage {} to desire size {}", kv.getKey(), kv.getValue());
                this.jobScalerContext.getMasterClientApi().scaleJobStage(
                        this.jobScalerContext.getJobId(),
                        Integer.parseInt(kv.getKey()),
                        kv.getValue().getDesireSize(),
                        "Desire size from scaling ruleID: " + newRule.getRuleId())
                    .retryWhen(RuleUtils.LimitTenRetryLogic)
                    .doOnCompleted(() -> log.info("Scaled stage {} to desire size {}", kv.getKey(), kv.getValue()))
                    .onErrorResumeNext(throwable -> {
                        log.error("{} Failed to scale stage {} to desire size {}",
                            this.jobScalerContext.getJobId(), kv.getKey(), kv.getKey());
                        return Observable.empty();
                    })
                    .toBlocking()
                    .first();
                log.info("Finish scaling stage {} to desire size {}", kv.getKey(), kv.getValue());
            }

            if (newService == null) {
                log.info("[No Scaler Required] Job Auto Scaler service is null for rule: {}", newRule.getRuleId());
            } else {
                log.info("start activeJobAutoScalerService for {}", this.activeRule.getRuleId());
                newService.start();
            }
            return newRule.getRuleId();
        }, executionContext);

        // handle scaler service error back in dispatcher thread.
        startServiceFuture.onComplete(result -> {
            if (result.isSuccess()) {
                log.info("Job Auto Scaler started successfully for ruleID: {}", result.get());
            } else {
                log.error("failed to setup job auto scaler service in rule: {}",
                    newRule, result.failed().get());
                // only reset if the failed rule service is the current active one back in dispatcher thread.
                if (this.activeRule.getRuleId().equals(newRule.getRuleId())) {
                    log.error("reset controller actor due to failed rule: {}", this.activeRule.getRuleId());
                    this.activeRule = null;
                    this.activeJobAutoScalerService = null;

                    throw new RuntimeException("failed to start job scaler", result.failed().get());
                    // getContext().getParent().tell(CoordinatorActor.RefreshRuleRequest.of(this.jobScalerContext.getJobId()), self());
                } else {
                    log.warn("Ignore non-active rule service start failure: {}, current rule: {}",
                        newRule.getRuleId(), this.activeRule.getRuleId());
                }
            }
            return null;
        }, getContext().dispatcher());

        this.activateCount.increment();
        try {
            this.activeRuleGauge.set(Long.parseLong(activateScalerRequest.getRule().getRuleId()));
        } catch (NumberFormatException e) {
            log.error("Unexpected non-number rule id: {}", activateScalerRequest.getRule().getRuleId(), e);
        }
        log.info("Activated scaler rule: {}", activateScalerRequest.getRule().getRuleId());
    }