in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/autoscalar/AutoScalar.java [110:176]
public void apply(RebalancingJobGroup rebalancingJobGroup, double defaultScale) {
if (!leaderSelector.isLeader()) {
logger.debug("skipped apply because current instance is not leader");
throw new IllegalArgumentException("Not Leader");
}
JobState state = rebalancingJobGroup.getJobGroupState();
JobGroup jobGroup = rebalancingJobGroup.getJobGroup();
if (state != JobState.JOB_STATE_RUNNING) {
logger.debug(
"skipped apply because jobGroup is not running",
StructuredLogging.jobGroupId(jobGroup.getJobGroupId()),
StructuredLogging.kafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic()),
StructuredLogging.kafkaCluster(jobGroup.getKafkaConsumerTaskGroup().getCluster()),
StructuredLogging.kafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup()));
return;
}
JobGroupKey jobGroupKey = JobGroupKey.of(jobGroup);
final SignatureAndScale quota = new SignatureAndScale(jobGroup.getFlowControl());
final Optional<Double> scale = rebalancingJobGroup.getScale();
double newScale =
autoScalarStatusStore
.asMap()
.computeIfAbsent(
jobGroupKey,
key ->
new JobGroupScaleStatus(scale.isPresent() ? quota.build(scale.get()) : quota))
.getScale(quota, jobGroup.getMiscConfig().getScaleResetEnabled());
scope
.tagged(
StructuredTags.builder()
.setKafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup())
.setKafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic())
.build())
.gauge(MetricNames.AUTOSCALAR_COMPUTED_SCALE)
.update(newScale);
// use default scale in dryRun mode
// TODO: remove hardcoded topic group after root cause fixed see KAFEP-2386
if (config.isDryRun()
|| (jobGroupKey.getGroup().equals("fulfillment-indexing-gateway")
&& jobGroupKey
.getTopic()
.equals("fulfillment-raw-transport-provider-session-state-changes"))) {
newScale = defaultScale;
}
scope
.tagged(
StructuredTags.builder()
.setKafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup())
.setKafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic())
.build())
.gauge(MetricNames.AUTOSCALAR_APPLIED_SCALE)
.update(newScale);
if (rebalancingJobGroup.updateScale(newScale)) {
logger.info(
String.format("update jobGroup scale to %.2f", newScale),
StructuredLogging.jobGroupId(jobGroup.getJobGroupId()),
StructuredLogging.kafkaTopic(jobGroup.getKafkaConsumerTaskGroup().getTopic()),
StructuredLogging.kafkaCluster(jobGroup.getKafkaConsumerTaskGroup().getCluster()),
StructuredLogging.kafkaGroup(jobGroup.getKafkaConsumerTaskGroup().getConsumerGroup()));
}
}