in mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/clutch/ClutchAutoScaler.java [146:225]
public Observable<Object> call(Observable<JobAutoScaler.Event> metrics) {
metrics = metrics
.share();
ClutchController cpuController = new ClutchController(CPU, this.config.cpu.getOrElse(defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);
ClutchController memController = new ClutchController(JVMMemory, this.config.memory.getOrElse(defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);
ClutchController netController = new ClutchController(Network, this.config.network.getOrElse(defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);
Observable<ClutchControllerOutput> cpuSignal = metrics.filter(event -> event.getType().equals(CPU))
.compose(cpuController);
Observable<ClutchControllerOutput> memorySignal = metrics.filter(event -> event.getType().equals(JVMMemory))
.compose(memController);
Observable<ClutchControllerOutput> networkSignal = metrics.filter(event -> event.getType().equals(Network))
.compose(netController);
Observable<Tuple3<Double, Double, Double>> rawMetricsTuples = Observable.zip(
metrics.filter(event -> event.getType().equals(CPU)).map(JobAutoScaler.Event::getValue),
metrics.filter(event -> event.getType().equals(JVMMemory)).map(JobAutoScaler.Event::getValue),
metrics.filter(event -> event.getType().equals(Network)).map(JobAutoScaler.Event::getValue),
Tuple::of);
Observable<Tuple3<ClutchControllerOutput, ClutchControllerOutput, ClutchControllerOutput>> controlSignals = Observable.zip(
cpuSignal,
memorySignal,
networkSignal,
Tuple::of);
Observable<Double> kafkaLag = metrics.filter(event -> event.getType().equals(StageScalingPolicy.ScalingReason.KafkaLag))
.map(JobAutoScaler.Event::getValue)
.map(x -> x / this.config.rps);
Observable<Double> dataDrop = metrics.filter(event -> event.getType().equals(StageScalingPolicy.ScalingReason.DataDrop))
.map(x -> (x.getValue() / 100.0) * x.getNumWorkers());
// Jobs either read from Kafka and produce lag, or they read from other jobs and produce drops.
Observable<Double> error = Observable.merge(Observable.just(0.0), kafkaLag, dataDrop);
Observable<Integer> currentScale = metrics.map(JobAutoScaler.Event::getNumWorkers);
Observable<Tuple3<String, Double, Integer>> controllerSignal = Observable.zip(rawMetricsTuples, controlSignals, Tuple::of)
.withLatestFrom(currentScale, (tup, scale) -> Tuple.of(tup._1, tup._2, scale))
.withLatestFrom(error, (tup, err) -> Tuple.of(tup._1, tup._2, tup._3, err))
.map(tup -> {
int currentWorkerCount = tup._3;
ClutchControllerOutput dominantResource = findDominatingResource(tup._2);
String resourceName = dominantResource.reason.name();
//
// Correction
//
double yhat = tup._4;
yhat = Math.min(yhat, config.maxAdjustment.getOrElse(config.maxSize * 1.0));
yhat = yhat < 1.0 ? 0.0 : yhat;
if (System.currentTimeMillis() > this.cooldownTimestamp.get()) {
double x = correction.addAndGet(yhat);
x = Math.min(x, config.maxAdjustment.getOrElse(config.maxSize * 1.0));
correction.set(x);
}
correction.set(correction.get() * 0.99); // Exponentially decay our correction.
correction.set(Double.isNaN(correction.get()) ? 0.0 : correction.get());
Double targetScale = enforceMinMax(Math.ceil(dominantResource.scale) + Math.ceil(correction.get()), this.config.minSize, this.config.maxSize);
String logMessage = String.format(autoscaleLogMessageFormat, scaler.getStage(), targetScale.intValue(), tup._2._1.scale, tup._2._2.scale, tup._2._3.scale, gainDampeningFactor.get(), correction.get(), resourceName);
return Tuple.of(logMessage, targetScale, currentWorkerCount);
});
return controllerSignal
.filter(__ -> System.currentTimeMillis() > this.cooldownTimestamp.get())
.filter(tup -> Math.abs(Math.round(tup._2) - tup._3) > 0.99) //
.doOnNext(signal -> log.info(signal._1))
.compose(new ClutchMantisStageActuator(this.scaler))
.map(Math::round)
.doOnNext(x -> actionCache.put(System.currentTimeMillis(), x - targetScale.get()))
.doOnNext(targetScale::set)
.doOnNext(__ -> cooldownTimestamp.set(System.currentTimeMillis() + config.cooldownSeconds.getOrElse(0L) * 1000))
.map(x -> (Object) x);
}