private Optional rateLimitScaleDownRecommendation()

in src/main/java/com/google/cloud/run/kafkascaler/ScalingStabilizer.java [323:379]


  private Optional<Integer> rateLimitScaleDownRecommendation(
      Instant time, int currentInstanceCount, Scaling scaleDown, BoundsDebugInfo debug) {
    BiFunction<Integer, Integer, Integer> comparator = Math::min;
    int scaleDownBound = MAX_INSTANCES;

    if (scaleDown.selectPolicy() == Scaling.SelectPolicy.DISABLED) {
      // Check disabled because this takes priority over empty policies.
      debug.setSelectorDebugString(Scaling.SelectPolicy.DISABLED);
      return Optional.of(currentInstanceCount);
    } else if (scaleDown.policies().isEmpty()) {
      return Optional.empty();
    } else if (scaleDown.selectPolicy() == Scaling.SelectPolicy.MIN) {
      debug.setSelectorDebugString(Scaling.SelectPolicy.MIN);
      comparator = Math::max;
      scaleDownBound = MIN_INSTANCES;
    } else if (scaleDown.selectPolicy() == Scaling.SelectPolicy.MAX) {
      debug.setSelectorDebugString(Scaling.SelectPolicy.MAX);
      comparator = Math::min;
      scaleDownBound = MAX_INSTANCES;
    }

    for (Policy policy : scaleDown.policies()) {
      int bound;
      int instancesAddedInPeriod =
          getInstanceCountChangeForPeriod(time, policy.periodSeconds(), scaleUpEvents);
      int instancesRemovedInPeriod =
          getInstanceCountChangeForPeriod(time, policy.periodSeconds(), scaleDownEvents);
      int periodStartInstances =
          currentInstanceCount - instancesAddedInPeriod + instancesRemovedInPeriod;

      switch (policy.type()) {
        case PERCENT:
          // Round down to ensure we're able to scale down.
          MathContext context = new MathContext(16, RoundingMode.DOWN);
          BigDecimal startInstances = new BigDecimal(periodStartInstances);
          BigDecimal policyValue = new BigDecimal(policy.value());
          bound =
              startInstances
                  .multiply(
                      BigDecimal.ONE.subtract(
                          policyValue.divide(new BigDecimal(100), context), context),
                      context)
                  .intValue();

          break;
        case INSTANCES:
          bound = periodStartInstances - policy.value();
          break;
        default:
          throw new IllegalArgumentException("Unsupported policy type: " + policy.type());
      }
      debug.addScaleDown(policy.type(), bound);
      scaleDownBound = comparator.apply(scaleDownBound, bound);
    }

    return Optional.of(scaleDownBound);
  }