public void apply()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/autoscalar/AutoScalar.java [141:211]


  public void apply(RebalancingJobGroup rebalancingJobGroup, double defaultScale) {
    if (!leaderSelector.isLeader()) {
      logger.debug("skipped apply because current instance is not leader");
      throw new IllegalArgumentException("Not Leader");
    }

    JobState state = rebalancingJobGroup.getJobGroupState();
    JobGroup jobGroup = rebalancingJobGroup.getJobGroup();
    if (state != JobState.JOB_STATE_RUNNING) {
      logger.debug(
          "skipped apply because jobGroup is not running",
          StructuredLogging.jobGroupId(jobGroup.getJobGroupId()),
          StructuredLogging.kafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic()),
          StructuredLogging.kafkaCluster(jobGroup.getKafkaConsumerTaskGroup().getCluster()),
          StructuredLogging.kafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup()));
      return;
    }

    JobGroupKey jobGroupKey = JobGroupKey.of(jobGroup);
    final SignatureAndScale quota = new SignatureAndScale(jobGroup.getFlowControl());
    final SignatureAndScale signatureAndScale;
    final Optional<Double> scale = rebalancingJobGroup.getScale();
    if (scale.isEmpty()) {
      signatureAndScale = quota;
      logger.info(
          String.format("Initialize job group scale state with quota"),
          StructuredLogging.kafkaTopic(jobGroupKey.getTopic()),
          StructuredLogging.kafkaCluster(jobGroupKey.getCluster()),
          StructuredLogging.kafkaGroup(jobGroupKey.getGroup()));
    } else {
      signatureAndScale = new SignatureAndScale(scale.get(), jobGroup.getFlowControl());
    }
    double newScale =
        statusStore
            .asMap()
            .computeIfAbsent(
                jobGroupKey, key -> new JobGroupScaleStatus(jobGroupKey, signatureAndScale))
            .getScale(quota, jobGroup.getMiscConfig().getScaleResetEnabled());

    scope
        .tagged(
            StructuredTags.builder()
                .setKafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup())
                .setKafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic())
                .build())
        .gauge(MetricNames.AUTOSCALAR_COMPUTED_SCALE)
        .update(newScale);

    // use default scale in dryRun mode
    if (config.isDryRun()) {
      newScale = defaultScale;
    }

    scope
        .tagged(
            StructuredTags.builder()
                .setKafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup())
                .setKafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic())
                .build())
        .gauge(MetricNames.AUTOSCALAR_APPLIED_SCALE)
        .update(newScale);

    if (rebalancingJobGroup.updateScale(newScale, scaleToThroughput(newScale))) {
      logger.info(
          String.format("update jobGroup scale to %.2f", newScale),
          StructuredLogging.jobGroupId(jobGroup.getJobGroupId()),
          StructuredLogging.kafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic()),
          StructuredLogging.kafkaCluster(jobGroup.getKafkaConsumerTaskGroup().getCluster()),
          StructuredLogging.kafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup()));
    }
  }