public void start()

in mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java [186:333]


    public void start() {
         this.subscription = subject
            .onBackpressureBuffer(100, () -> {
                logger.info("onOverflow triggered, dropping old events");
            }, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
            .doOnRequest(x -> {
                logger.debug("Scaler requested {} metrics.", x);
                this.requestMetricsCount.increment();
            })
            .groupBy(Event::getStage)
            .flatMap(go -> {
                Integer stage = Optional.ofNullable(go.getKey()).orElse(-1);

                logger.debug("System Environment:");
                System.getenv().forEach((key, value) -> {
                    logger.debug("{} = {}", key, value);
                });

                Optional<String> clutchCustomConfiguration =
                    this.clutchCustomConfigurationFromRule != null ?
                        Optional.of(this.clutchCustomConfigurationFromRule) :
                        Optional.ofNullable(MantisProperties.getProperty("JOB_PARAM_" + SystemParameters.JOB_MASTER_CLUTCH_SYSTEM_PARAM));

                if (this.stagePolicyMap.containsKey(stage) &&
                    (this.stagePolicyMap.get(stage) != null || clutchCustomConfiguration.isPresent())) {

                    ClutchConfiguration config = null;
                    int minSize = 0;
                    int maxSize = 0;
                    boolean useJsonConfigBased = false;
                    boolean useClutch = false;
                    boolean useClutchRps = false;
                    boolean useClutchExperimental = false;

                    final StageScalingPolicy scalingPolicy = this.stagePolicyMap.get(stage).getScalingPolicy();
                    // Determine which type of scaler to use.
                    if (scalingPolicy != null) {
                        minSize = scalingPolicy.getMin();
                        maxSize = scalingPolicy.getMax();
                        if (scalingPolicy.getStrategies() != null) {
                            Set<ScalingReason> reasons = scalingPolicy.getStrategies()
                                .values()
                                .stream()
                                .map(StageScalingPolicy.Strategy::getReason)
                                .collect(Collectors.toSet());
                            if (reasons.contains(ScalingReason.Clutch)) {
                                useClutch = true;
                            } else if (reasons.contains(ScalingReason.ClutchExperimental)) {
                                useClutchExperimental = true;
                            } else if (reasons.contains(ScalingReason.ClutchRps)) {
                                useClutchRps = true;
                            }
                        }
                    }
                    if (clutchCustomConfiguration.isPresent()) {
                        try {
                            config = getClutchConfiguration(clutchCustomConfiguration.get()).get(stage);
                        } catch (Exception ex) {
                            logger.error("Error parsing json clutch config: {}", clutchCustomConfiguration.get(), ex);
                        }
                        if (config != null) {
                            if (config.getRpsConfig().isDefined()) {
                                useClutchRps = true;
                            } else if (config.getUseExperimental().getOrElse(false)) {
                                useClutch = true;
                            } else {
                                useJsonConfigBased = true;
                            }
                            if (config.getMinSize() > 0) {
                                minSize = config.getMinSize();
                            }
                            if (config.getMaxSize() > 0) {
                                maxSize = config.getMaxSize();
                            }
                        }
                    }

                    final StageScalingInfo stageScalingInfo = this.stagePolicyMap.get(stage);
                    int initialSize = stageScalingInfo.getDesireSize();
                    StageScaler scaler = new StageScaler(stage, scalingPolicy, this.scalerId, this.context);
                    MantisStageActuator actuator = new MantisStageActuator(initialSize, scaler);

                    Observable.Transformer<Event, io.mantisrx.control.clutch.Event> transformToClutchEvent =
                        obs -> obs.map(this::mantisEventToClutchEvent)
                            .filter(event -> event.metric != null);
                    Observable<Integer> workerCounts = context.getWorkerMapObservable()
                        .map(x -> x.getWorkersForStage(go.getKey()).size())
                        .distinctUntilChanged()
                        .throttleLast(5, TimeUnit.SECONDS);

                    if (useClutchRps) {
                        logger.info("Using clutch rps scaler, job: {}, stage: {} ", jobId, stage);
                        ClutchRpsPIDConfig rpsConfig = Option.of(config).flatMap(ClutchConfiguration::getRpsConfig).getOrNull();
                        return go
                            .compose(transformToClutchEvent)
                            .compose(new ClutchExperimental(
                                actuator,
                                initialSize,
                                minSize,
                                maxSize,
                                workerCounts,
                                Observable.interval(1, TimeUnit.HOURS),
                                TimeUnit.MINUTES.toMillis(10),
                                new RpsClutchConfigurationSelector(stage, stageScalingInfo, config),
                                new RpsMetricComputer(),
                                new RpsScaleComputer(rpsConfig)));
                    } else if (useJsonConfigBased) {
                        logger.info("Using json config based scaler, job: {}, stage: {} ", jobId, stage);
                        return go
                            .compose(new ClutchAutoScaler(scaler, config, initialSize));
                    } else if (useClutch) {
                        logger.info("Using clutch scaler, job: {}, stage: {} ", jobId, stage);
                        return go
                            .compose(transformToClutchEvent)
                            .compose(new Clutch(
                                actuator,
                                initialSize,
                                minSize,
                                maxSize));
                    } else if (useClutchExperimental) {
                        logger.info("Using clutch experimental scaler, job: {}, stage: {} ", jobId, stage);
                        return go
                            .compose(transformToClutchEvent)
                            .compose(new ClutchExperimental(
                                actuator,
                                initialSize,
                                minSize,
                                maxSize,
                                workerCounts,
                                Observable.interval(1, TimeUnit.HOURS),
                                TimeUnit.MINUTES.toMillis(10),
                                new MantisClutchConfigurationSelector(stage, stageScalingInfo)));
                    } else {
                        logger.info("Using rule based scaler, job: {}, stage: {} ", jobId, stage);
                        return go.compose(new TransformerWrapper<>(
                            new StageScaleOperator<>(stage, scalingPolicy, this.scalerId, this.context)));
                    }
                } else {
                    return go;
                }
            })
            .doOnCompleted(() -> logger.info("onComplete on JobAutoScaler subject"))
            .doOnError(t -> logger.error("got onError in JobAutoScaler", t))
            .doOnSubscribe(() -> logger.info("onSubscribe JobAutoScaler"))
            .doOnUnsubscribe(() -> logger.info("Unsubscribing for JobAutoScaler of job {}", jobId))
            .retry()
            .subscribe();
    }