private Optional rateLimitScaleUpRecommendation()

in src/main/java/com/google/cloud/run/kafkascaler/ScalingStabilizer.java [232:289]


  private Optional<Integer> rateLimitScaleUpRecommendation(
      Instant time, int currentInstanceCount, Scaling scaleUp, BoundsDebugInfo debug) {
    // Allow the largest change by default.
    BiFunction<Integer, Integer, Integer> comparator = Math::max;
    int scaleUpBound = 0;

    if (scaleUp.selectPolicy() == Scaling.SelectPolicy.DISABLED) {
      // Check disabled because this takes priority over empty policies.
      debug.setSelectorDebugString(Scaling.SelectPolicy.DISABLED);
      return Optional.of(currentInstanceCount);
    } else if (scaleUp.policies().isEmpty()) {
      return Optional.empty();
    } else if (scaleUp.selectPolicy() == Scaling.SelectPolicy.MIN) {
      debug.setSelectorDebugString(Scaling.SelectPolicy.MIN);
      comparator = Math::min;
      scaleUpBound = MAX_INSTANCES;
    } else if (scaleUp.selectPolicy() == Scaling.SelectPolicy.MAX) {
      debug.setSelectorDebugString(Scaling.SelectPolicy.MAX);
      comparator = Math::max;
      scaleUpBound = 0;
    }

    for (Policy policy : scaleUp.policies()) {
      int bound;
      int instancesAddedInPeriod =
          getInstanceCountChangeForPeriod(time, policy.periodSeconds(), scaleUpEvents);
      int instancesRemovedInPeriod =
          getInstanceCountChangeForPeriod(time, policy.periodSeconds(), scaleDownEvents);
      int periodStartInstances =
          currentInstanceCount - instancesAddedInPeriod + instancesRemovedInPeriod;

      switch (policy.type()) {
        case PERCENT:
          // Round up to ensure we're able to scale up.
          MathContext context = new MathContext(16, RoundingMode.UP);
          BigDecimal startInstances = new BigDecimal(periodStartInstances);
          BigDecimal policyValue = new BigDecimal(policy.value());
          bound =
              startInstances
                  .multiply(
                      BigDecimal.ONE.add(policyValue.divide(new BigDecimal(100), context), context),
                      context)
                  .setScale(0, RoundingMode.UP)
                  .intValue();
          break;
        case INSTANCES:
          bound = periodStartInstances + policy.value();
          break;
        default:
          throw new IllegalArgumentException("Unsupported policy type: " + policy.type());
      }

      debug.addScaleUp(policy.type(), bound);
      scaleUpBound = comparator.apply(scaleUpBound, bound);
    }

    return Optional.of(scaleUpBound);
  }