public Observable call()

in mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchAutoScaler.java [146:225]


    public Observable<Object> call(Observable<JobAutoScaler.Event> metrics) {
        metrics = metrics
                .share();

        ClutchController cpuController = new ClutchController(CPU, this.config.cpu.getOrElse(defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);
        ClutchController memController = new ClutchController(JVMMemory, this.config.memory.getOrElse(defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);
        ClutchController netController = new ClutchController(Network, this.config.network.getOrElse(defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);

        Observable<ClutchControllerOutput> cpuSignal = metrics.filter(event -> event.getType().equals(CPU))
                .compose(cpuController);
        Observable<ClutchControllerOutput> memorySignal = metrics.filter(event -> event.getType().equals(JVMMemory))
                .compose(memController);
        Observable<ClutchControllerOutput> networkSignal = metrics.filter(event -> event.getType().equals(Network))
                .compose(netController);

        Observable<Tuple3<Double, Double, Double>> rawMetricsTuples = Observable.zip(
                metrics.filter(event -> event.getType().equals(CPU)).map(JobAutoScaler.Event::getValue),
                metrics.filter(event -> event.getType().equals(JVMMemory)).map(JobAutoScaler.Event::getValue),
                metrics.filter(event -> event.getType().equals(Network)).map(JobAutoScaler.Event::getValue),
                Tuple::of);

        Observable<Tuple3<ClutchControllerOutput, ClutchControllerOutput, ClutchControllerOutput>> controlSignals = Observable.zip(
                cpuSignal,
                memorySignal,
                networkSignal,
                Tuple::of);

        Observable<Double> kafkaLag = metrics.filter(event -> event.getType().equals(StageScalingPolicy.ScalingReason.KafkaLag))
                .map(JobAutoScaler.Event::getValue)
                .map(x -> x / this.config.rps);

        Observable<Double> dataDrop = metrics.filter(event -> event.getType().equals(StageScalingPolicy.ScalingReason.DataDrop))
                .map(x -> (x.getValue() / 100.0) * x.getNumWorkers());

        // Jobs either read from Kafka and produce lag, or they read from other jobs and produce drops.
        Observable<Double> error = Observable.merge(Observable.just(0.0), kafkaLag, dataDrop);

        Observable<Integer> currentScale = metrics.map(JobAutoScaler.Event::getNumWorkers);

        Observable<Tuple3<String, Double, Integer>> controllerSignal = Observable.zip(rawMetricsTuples, controlSignals, Tuple::of)
                .withLatestFrom(currentScale, (tup, scale) -> Tuple.of(tup._1, tup._2, scale))
                .withLatestFrom(error, (tup, err) -> Tuple.of(tup._1, tup._2, tup._3, err))
                .map(tup -> {
                    int currentWorkerCount = tup._3;
                    ClutchControllerOutput dominantResource = findDominatingResource(tup._2);
                    String resourceName = dominantResource.reason.name();

                    //
                    // Correction
                    //

                    double yhat = tup._4;
                    yhat = Math.min(yhat, config.maxAdjustment.getOrElse(config.maxSize * 1.0));
                    yhat = yhat < 1.0 ? 0.0 : yhat;

                    if (System.currentTimeMillis() > this.cooldownTimestamp.get()) {
                        double x = correction.addAndGet(yhat);
                        x = Math.min(x, config.maxAdjustment.getOrElse(config.maxSize * 1.0));
                        correction.set(x);
                    }
                    correction.set(correction.get() * 0.99); // Exponentially decay our correction.
                    correction.set(Double.isNaN(correction.get()) ? 0.0 : correction.get());

                    Double targetScale = enforceMinMax(Math.ceil(dominantResource.scale) + Math.ceil(correction.get()), this.config.minSize, this.config.maxSize);
                    String logMessage = String.format(autoscaleLogMessageFormat, scaler.getStage(), targetScale.intValue(), tup._2._1.scale, tup._2._2.scale, tup._2._3.scale, gainDampeningFactor.get(), correction.get(), resourceName);

                    return Tuple.of(logMessage, targetScale, currentWorkerCount);
                });

        return controllerSignal
                .filter(__ -> System.currentTimeMillis() > this.cooldownTimestamp.get())
                .filter(tup -> Math.abs(Math.round(tup._2) - tup._3) > 0.99) //
                .doOnNext(signal -> log.info(signal._1))
                .compose(new ClutchMantisStageActuator(this.scaler))
                .map(Math::round)
                .doOnNext(x -> actionCache.put(System.currentTimeMillis(), x - targetScale.get()))
                .doOnNext(targetScale::set)
                .doOnNext(__ -> cooldownTimestamp.set(System.currentTimeMillis() + config.cooldownSeconds.getOrElse(0L) * 1000))
                .map(x -> (Object) x);
    }