in mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java [550:623]
public Subscriber<? super Event> call(final Subscriber<? super Object> child) {
return new Subscriber<Event>() {
private final Map<StageScalingPolicy.ScalingReason, UsageDataStats> dataStatsMap = new HashMap<>();
@Override
public void onCompleted() {
child.unsubscribe();
}
@Override
public void onError(Throwable e) {
logger.error("Unexpected error: " + e.getMessage(), e);
}
@Override
public void onNext(Event event) {
long coolDownSecs = scalingPolicy.getCoolDownSecs();
boolean scalable = scalingPolicy.isEnabled();
logger.debug("Will check for autoscaling job {} stage {} due to event: {}", jobId, stage, event);
if (scalable) {
final StageScalingPolicy.Strategy strategy = scalingPolicy.getStrategies().get(event.getType());
if (strategy != null) {
double effectiveValue = event.getEffectiveValue();
UsageDataStats stats = dataStatsMap.get(event.getType());
if (stats == null) {
stats = new UsageDataStats(
strategy.getScaleUpAbovePct(), strategy.getScaleDownBelowPct(), strategy.getRollingCount());
dataStatsMap.put(event.getType(), stats);
}
stats.add(effectiveValue);
if (lastScaledAt < (System.currentTimeMillis() - coolDownSecs * 1000)) {
logger.info("{}, stage {}, eventType {}: eff={}, thresh={}", jobId, stage, event.getType(),
String.format(PercentNumberFormat, effectiveValue), strategy.getScaleUpAbovePct());
if (stats.getHighThreshTriggered()) {
logger.info("Attempting to scale up stage {} of job {} by {} workers, because {} exceeded scaleUpThreshold of {} {} times",
stage, jobId, scalingPolicy.getIncrement(), event.getType(),
String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()),
stats.getCurrentHighCount());
final int numCurrWorkers = event.getNumWorkers();
final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers, event);
if (desiredWorkers > numCurrWorkers) {
scaler.scaleUpStage(numCurrWorkers, desiredWorkers, event.getType() + " with value " +
String.format(PercentNumberFormat, effectiveValue) +
" exceeded scaleUp threshold of " + strategy.getScaleUpAbovePct());
lastScaledAt = System.currentTimeMillis();
logger.info("lastScaledAt set to {} after scale up request", lastScaledAt);
} else {
logger.debug("scale up NOOP: desiredWorkers same as current workers");
}
} else if (stats.getLowThreshTriggered()) {
logger.info("Attempting to scale down stage {} of job {} by {} workers, because {} is below scaleDownThreshold of {} {} times",
stage, jobId, scalingPolicy.getDecrement(), event.getType(),
strategy.getScaleDownBelowPct(), stats.getCurrentLowCount());
final int numCurrentWorkers = event.getNumWorkers();
final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers, event);
if (desiredWorkers < numCurrentWorkers) {
scaler.scaleDownStage(numCurrentWorkers, desiredWorkers, event.getType() + " with value " +
String.format(PercentNumberFormat, effectiveValue) +
" is below scaleDown threshold of " + strategy.getScaleDownBelowPct());
lastScaledAt = System.currentTimeMillis();
logger.info("lastScaledAt set to {} after scale down request", lastScaledAt);
} else {
logger.debug("scale down NOOP: desiredWorkers same as current workers");
}
}
} else {
logger.debug("lastScaledAt {} within cooldown period", lastScaledAt);
}
}
}
}
};
}