in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/autoscalar/AutoScalar.java [141:211]
public void apply(RebalancingJobGroup rebalancingJobGroup, double defaultScale) {
if (!leaderSelector.isLeader()) {
logger.debug("skipped apply because current instance is not leader");
throw new IllegalArgumentException("Not Leader");
}
JobState state = rebalancingJobGroup.getJobGroupState();
JobGroup jobGroup = rebalancingJobGroup.getJobGroup();
if (state != JobState.JOB_STATE_RUNNING) {
logger.debug(
"skipped apply because jobGroup is not running",
StructuredLogging.jobGroupId(jobGroup.getJobGroupId()),
StructuredLogging.kafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic()),
StructuredLogging.kafkaCluster(jobGroup.getKafkaConsumerTaskGroup().getCluster()),
StructuredLogging.kafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup()));
return;
}
JobGroupKey jobGroupKey = JobGroupKey.of(jobGroup);
final SignatureAndScale quota = new SignatureAndScale(jobGroup.getFlowControl());
final SignatureAndScale signatureAndScale;
final Optional<Double> scale = rebalancingJobGroup.getScale();
if (scale.isEmpty()) {
signatureAndScale = quota;
logger.info(
String.format("Initialize job group scale state with quota"),
StructuredLogging.kafkaTopic(jobGroupKey.getTopic()),
StructuredLogging.kafkaCluster(jobGroupKey.getCluster()),
StructuredLogging.kafkaGroup(jobGroupKey.getGroup()));
} else {
signatureAndScale = new SignatureAndScale(scale.get(), jobGroup.getFlowControl());
}
double newScale =
statusStore
.asMap()
.computeIfAbsent(
jobGroupKey, key -> new JobGroupScaleStatus(jobGroupKey, signatureAndScale))
.getScale(quota, jobGroup.getMiscConfig().getScaleResetEnabled());
scope
.tagged(
StructuredTags.builder()
.setKafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup())
.setKafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic())
.build())
.gauge(MetricNames.AUTOSCALAR_COMPUTED_SCALE)
.update(newScale);
// use default scale in dryRun mode
if (config.isDryRun()) {
newScale = defaultScale;
}
scope
.tagged(
StructuredTags.builder()
.setKafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup())
.setKafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic())
.build())
.gauge(MetricNames.AUTOSCALAR_APPLIED_SCALE)
.update(newScale);
if (rebalancingJobGroup.updateScale(newScale, scaleToThroughput(newScale))) {
logger.info(
String.format("update jobGroup scale to %.2f", newScale),
StructuredLogging.jobGroupId(jobGroup.getJobGroupId()),
StructuredLogging.kafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic()),
StructuredLogging.kafkaCluster(jobGroup.getKafkaConsumerTaskGroup().getCluster()),
StructuredLogging.kafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup()));
}
}