public void apply()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/autoscalar/AutoScalar.java [110:176]


  public void apply(RebalancingJobGroup rebalancingJobGroup, double defaultScale) {
    if (!leaderSelector.isLeader()) {
      logger.debug("skipped apply because current instance is not leader");
      throw new IllegalArgumentException("Not Leader");
    }

    JobState state = rebalancingJobGroup.getJobGroupState();
    JobGroup jobGroup = rebalancingJobGroup.getJobGroup();
    if (state != JobState.JOB_STATE_RUNNING) {
      logger.debug(
          "skipped apply because jobGroup is not running",
          StructuredLogging.jobGroupId(jobGroup.getJobGroupId()),
          StructuredLogging.kafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic()),
          StructuredLogging.kafkaCluster(jobGroup.getKafkaConsumerTaskGroup().getCluster()),
          StructuredLogging.kafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup()));
      return;
    }

    JobGroupKey jobGroupKey = JobGroupKey.of(jobGroup);
    final SignatureAndScale quota = new SignatureAndScale(jobGroup.getFlowControl());
    final Optional<Double> scale = rebalancingJobGroup.getScale();
    double newScale =
        autoScalarStatusStore
            .asMap()
            .computeIfAbsent(
                jobGroupKey,
                key ->
                    new JobGroupScaleStatus(scale.isPresent() ? quota.build(scale.get()) : quota))
            .getScale(quota, jobGroup.getMiscConfig().getScaleResetEnabled());

    scope
        .tagged(
            StructuredTags.builder()
                .setKafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup())
                .setKafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic())
                .build())
        .gauge(MetricNames.AUTOSCALAR_COMPUTED_SCALE)
        .update(newScale);

    // use default scale in dryRun mode
    // TODO: remove hardcoded topic group after root cause fixed see KAFEP-2386
    if (config.isDryRun()
        || (jobGroupKey.getGroup().equals("fulfillment-indexing-gateway")
            && jobGroupKey
                .getTopic()
                .equals("fulfillment-raw-transport-provider-session-state-changes"))) {
      newScale = defaultScale;
    }

    scope
        .tagged(
            StructuredTags.builder()
                .setKafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup())
                .setKafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic())
                .build())
        .gauge(MetricNames.AUTOSCALAR_APPLIED_SCALE)
        .update(newScale);

    if (rebalancingJobGroup.updateScale(newScale)) {
      logger.info(
          String.format("update jobGroup scale to %.2f", newScale),
          StructuredLogging.jobGroupId(jobGroup.getJobGroupId()),
          StructuredLogging.kafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic()),
          StructuredLogging.kafkaCluster(jobGroup.getKafkaConsumerTaskGroup().getCluster()),
          StructuredLogging.kafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup()));
    }
  }