public Subscriber call()

in mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java [550:623]


      public Subscriber<? super Event> call(final Subscriber<? super Object> child) {

        return new Subscriber<Event>() {
          private final Map<StageScalingPolicy.ScalingReason, UsageDataStats> dataStatsMap = new HashMap<>();

          @Override
          public void onCompleted() {
            child.unsubscribe();
          }

          @Override
          public void onError(Throwable e) {
            logger.error("Unexpected error: " + e.getMessage(), e);
          }

          @Override
          public void onNext(Event event) {
            long coolDownSecs = scalingPolicy.getCoolDownSecs();
            boolean scalable = scalingPolicy.isEnabled();
            logger.debug("Will check for autoscaling job {} stage {} due to event: {}", jobId, stage, event);
            if (scalable) {
              final StageScalingPolicy.Strategy strategy = scalingPolicy.getStrategies().get(event.getType());
              if (strategy != null) {
                double effectiveValue = event.getEffectiveValue();
                UsageDataStats stats = dataStatsMap.get(event.getType());
                if (stats == null) {
                  stats = new UsageDataStats(
                      strategy.getScaleUpAbovePct(), strategy.getScaleDownBelowPct(), strategy.getRollingCount());
                  dataStatsMap.put(event.getType(), stats);
                }
                stats.add(effectiveValue);
                if (lastScaledAt < (System.currentTimeMillis() - coolDownSecs * 1000)) {
                  logger.info("{}, stage {}, eventType {}: eff={}, thresh={}", jobId, stage, event.getType(),
                      String.format(PercentNumberFormat, effectiveValue), strategy.getScaleUpAbovePct());
                  if (stats.getHighThreshTriggered()) {
                    logger.info("Attempting to scale up stage {} of job {} by {} workers, because {} exceeded scaleUpThreshold of {} {} times",
                        stage, jobId, scalingPolicy.getIncrement(), event.getType(),
                        String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()),
                        stats.getCurrentHighCount());
                    final int numCurrWorkers = event.getNumWorkers();
                    final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers, event);
                    if (desiredWorkers > numCurrWorkers) {
                      scaler.scaleUpStage(numCurrWorkers, desiredWorkers, event.getType() + " with value " +
                          String.format(PercentNumberFormat, effectiveValue) +
                          " exceeded scaleUp threshold of " + strategy.getScaleUpAbovePct());
                      lastScaledAt = System.currentTimeMillis();
                      logger.info("lastScaledAt set to {} after scale up request", lastScaledAt);
                    } else {
                      logger.debug("scale up NOOP: desiredWorkers same as current workers");
                    }
                  } else if (stats.getLowThreshTriggered()) {
                    logger.info("Attempting to scale down stage {} of job {} by {} workers, because {} is below scaleDownThreshold of {} {} times",
                        stage, jobId, scalingPolicy.getDecrement(), event.getType(),
                        strategy.getScaleDownBelowPct(), stats.getCurrentLowCount());
                    final int numCurrentWorkers = event.getNumWorkers();
                    final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers, event);
                    if (desiredWorkers < numCurrentWorkers) {
                      scaler.scaleDownStage(numCurrentWorkers, desiredWorkers, event.getType() + " with value " +
                          String.format(PercentNumberFormat, effectiveValue) +
                          " is below scaleDown threshold of " + strategy.getScaleDownBelowPct());
                      lastScaledAt = System.currentTimeMillis();
                      logger.info("lastScaledAt set to {} after scale down request", lastScaledAt);
                    } else {
                      logger.debug("scale down NOOP: desiredWorkers same as current workers");
                    }
                  }
                } else {
                  logger.debug("lastScaledAt {} within cooldown period", lastScaledAt);
                }
              }
            }
          }
        };
      }