in mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java [186:333]
public void start() {
this.subscription = subject
.onBackpressureBuffer(100, () -> {
logger.info("onOverflow triggered, dropping old events");
}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
.doOnRequest(x -> {
logger.debug("Scaler requested {} metrics.", x);
this.requestMetricsCount.increment();
})
.groupBy(Event::getStage)
.flatMap(go -> {
Integer stage = Optional.ofNullable(go.getKey()).orElse(-1);
logger.debug("System Environment:");
System.getenv().forEach((key, value) -> {
logger.debug("{} = {}", key, value);
});
Optional<String> clutchCustomConfiguration =
this.clutchCustomConfigurationFromRule != null ?
Optional.of(this.clutchCustomConfigurationFromRule) :
Optional.ofNullable(MantisProperties.getProperty("JOB_PARAM_" + SystemParameters.JOB_MASTER_CLUTCH_SYSTEM_PARAM));
if (this.stagePolicyMap.containsKey(stage) &&
(this.stagePolicyMap.get(stage) != null || clutchCustomConfiguration.isPresent())) {
ClutchConfiguration config = null;
int minSize = 0;
int maxSize = 0;
boolean useJsonConfigBased = false;
boolean useClutch = false;
boolean useClutchRps = false;
boolean useClutchExperimental = false;
final StageScalingPolicy scalingPolicy = this.stagePolicyMap.get(stage).getScalingPolicy();
// Determine which type of scaler to use.
if (scalingPolicy != null) {
minSize = scalingPolicy.getMin();
maxSize = scalingPolicy.getMax();
if (scalingPolicy.getStrategies() != null) {
Set<ScalingReason> reasons = scalingPolicy.getStrategies()
.values()
.stream()
.map(StageScalingPolicy.Strategy::getReason)
.collect(Collectors.toSet());
if (reasons.contains(ScalingReason.Clutch)) {
useClutch = true;
} else if (reasons.contains(ScalingReason.ClutchExperimental)) {
useClutchExperimental = true;
} else if (reasons.contains(ScalingReason.ClutchRps)) {
useClutchRps = true;
}
}
}
if (clutchCustomConfiguration.isPresent()) {
try {
config = getClutchConfiguration(clutchCustomConfiguration.get()).get(stage);
} catch (Exception ex) {
logger.error("Error parsing json clutch config: {}", clutchCustomConfiguration.get(), ex);
}
if (config != null) {
if (config.getRpsConfig().isDefined()) {
useClutchRps = true;
} else if (config.getUseExperimental().getOrElse(false)) {
useClutch = true;
} else {
useJsonConfigBased = true;
}
if (config.getMinSize() > 0) {
minSize = config.getMinSize();
}
if (config.getMaxSize() > 0) {
maxSize = config.getMaxSize();
}
}
}
final StageScalingInfo stageScalingInfo = this.stagePolicyMap.get(stage);
int initialSize = stageScalingInfo.getDesireSize();
StageScaler scaler = new StageScaler(stage, scalingPolicy, this.scalerId, this.context);
MantisStageActuator actuator = new MantisStageActuator(initialSize, scaler);
Observable.Transformer<Event, io.mantisrx.control.clutch.Event> transformToClutchEvent =
obs -> obs.map(this::mantisEventToClutchEvent)
.filter(event -> event.metric != null);
Observable<Integer> workerCounts = context.getWorkerMapObservable()
.map(x -> x.getWorkersForStage(go.getKey()).size())
.distinctUntilChanged()
.throttleLast(5, TimeUnit.SECONDS);
if (useClutchRps) {
logger.info("Using clutch rps scaler, job: {}, stage: {} ", jobId, stage);
ClutchRpsPIDConfig rpsConfig = Option.of(config).flatMap(ClutchConfiguration::getRpsConfig).getOrNull();
return go
.compose(transformToClutchEvent)
.compose(new ClutchExperimental(
actuator,
initialSize,
minSize,
maxSize,
workerCounts,
Observable.interval(1, TimeUnit.HOURS),
TimeUnit.MINUTES.toMillis(10),
new RpsClutchConfigurationSelector(stage, stageScalingInfo, config),
new RpsMetricComputer(),
new RpsScaleComputer(rpsConfig)));
} else if (useJsonConfigBased) {
logger.info("Using json config based scaler, job: {}, stage: {} ", jobId, stage);
return go
.compose(new ClutchAutoScaler(scaler, config, initialSize));
} else if (useClutch) {
logger.info("Using clutch scaler, job: {}, stage: {} ", jobId, stage);
return go
.compose(transformToClutchEvent)
.compose(new Clutch(
actuator,
initialSize,
minSize,
maxSize));
} else if (useClutchExperimental) {
logger.info("Using clutch experimental scaler, job: {}, stage: {} ", jobId, stage);
return go
.compose(transformToClutchEvent)
.compose(new ClutchExperimental(
actuator,
initialSize,
minSize,
maxSize,
workerCounts,
Observable.interval(1, TimeUnit.HOURS),
TimeUnit.MINUTES.toMillis(10),
new MantisClutchConfigurationSelector(stage, stageScalingInfo)));
} else {
logger.info("Using rule based scaler, job: {}, stage: {} ", jobId, stage);
return go.compose(new TransformerWrapper<>(
new StageScaleOperator<>(stage, scalingPolicy, this.scalerId, this.context)));
}
} else {
return go;
}
})
.doOnCompleted(() -> logger.info("onComplete on JobAutoScaler subject"))
.doOnError(t -> logger.error("got onError in JobAutoScaler", t))
.doOnSubscribe(() -> logger.info("onSubscribe JobAutoScaler"))
.doOnUnsubscribe(() -> logger.info("Unsubscribing for JobAutoScaler of job {}", jobId))
.retry()
.subscribe();
}