public void scale()

in src/main/java/com/google/cloud/run/kafkascaler/Scaler.java [98:200]


  public void scale() throws IOException, InterruptedException, ExecutionException {
    Instant now = Instant.now();

    if (!kafka.doesTopicExist(staticConfig.topicName())) {
      throw new IllegalArgumentException(
          String.format("The specified topic \"%s\" does not exist.", staticConfig.topicName()));
    }

    int currentInstanceCount =
        InstanceCountProvider.getInstanceCount(cloudRunClientWrapper, workloadInfo);
    logger.atInfo().log("[SCALING] Current instances: %d", currentInstanceCount);

    // Current lag should never be empty here because we already checked that the topic exists.
    Map<TopicPartition, Long> lagPerPartition =
        kafka
            .getLagPerPartition(staticConfig.topicName(), staticConfig.consumerGroupId())
            .orElseThrow(() -> new AssertionError("Current lag is empty."));

    long currentLag = lagPerPartition.values().stream().mapToLong(Long::longValue).sum();

    ScalingConfig scalingConfig = configProvider.scalingConfig();
    Behavior behavior = scalingConfig.spec().behavior();
    ImmutableList<Metric> metrics = scalingConfig.spec().metrics();

    MetricTarget lagTarget = null;
    MetricTarget cpuTarget = null;

    // TODO: Make this more polymorphic.
    for (Metric metric : metrics) {
      if (metric.type() == Metric.Type.RESOURCE
          && metric.resource().name().equals(SCALING_CONFIG_CPU_METRIC_NAME)) {
        cpuTarget = metric.resource().target();
      } else if (metric.type() == Metric.Type.EXTERNAL
          && metric.external().metric().name().equals(SCALING_CONFIG_LAG_METRIC_NAME)) {
        lagTarget = metric.external().target();
      }
    }

    if (lagTarget == null && cpuTarget == null) {
      logger.atSevere().log(
          "[SCALING] No scaling metric configured. At least one scaling metric must be"
              + " configured to enable autoscaling.");
      return;
    }

    LagScaling.Recommendation lagBasedRecommendation = null;
    if (lagTarget != null) {
      lagBasedRecommendation =
          LagScaling.makeRecommendation(lagTarget, currentInstanceCount, currentLag);
    }

    CpuScaling.Recommendation cpuBasedRecommendation = null;
    if (cpuTarget != null) {
      Optional<List<MetricsService.InstanceCountUtilization>> cpuUtilizationData =
          getCpuUtilizationData(cpuTarget.windowSeconds());
      if (cpuUtilizationData.isPresent()) {
        cpuBasedRecommendation = CpuScaling.makeRecommendation(cpuTarget, cpuUtilizationData.get());
      }
    }

    int recommendedInstanceCount =
        max(
            (lagBasedRecommendation != null && lagBasedRecommendation.isActive())
                ? lagBasedRecommendation.recommendedInstanceCount()
                : 0,
            (cpuBasedRecommendation != null && cpuBasedRecommendation.isActive())
                ? cpuBasedRecommendation.recommendedInstanceCount()
                : 0);

    if (recommendedInstanceCount > lagPerPartition.size()) {
      recommendedInstanceCount = lagPerPartition.size();
      logger.atWarning().log(
          "The recommended number of instances (%d) is greater than the number of partitions"
              + " (%d). The recommendation will be limited to the number of partitions.",
          recommendedInstanceCount, lagPerPartition.size());
    }

    int newInstanceCount =
        scalingStabilizer.getBoundedRecommendation(
            behavior, now, currentInstanceCount, recommendedInstanceCount);

    if (staticConfig.outputScalerMetrics()) {
      writeMetrics(currentLag, recommendedInstanceCount, newInstanceCount);
    }

    if (newInstanceCount == currentInstanceCount) {
      // Skip update request if the number of instances is unchanged.
      logger.atInfo().log("[SCALING] No change in recommended instances (%d)", newInstanceCount);
      return;
    }

    Instant nextUpdateAllowedTime = getNextUpdateAllowedTime(behavior);
    if (Instant.now().isAfter(nextUpdateAllowedTime)) {
      updateInstanceCount(newInstanceCount);
      logger.atInfo().log("[SCALING] Recommended instances: %d", newInstanceCount);
      scalingStabilizer.markScaleEvent(behavior, now, currentInstanceCount, newInstanceCount);
    } else {
      // Rate limited due to cooldown period
      logger.atInfo().log(
          "[SCALING] Within cooldown, no change. Next update allowed at: %s",
          nextUpdateAllowedTime);
    }
  }