in src/main/java/com/google/cloud/run/kafkascaler/Scaler.java [98:200]
public void scale() throws IOException, InterruptedException, ExecutionException {
Instant now = Instant.now();
if (!kafka.doesTopicExist(staticConfig.topicName())) {
throw new IllegalArgumentException(
String.format("The specified topic \"%s\" does not exist.", staticConfig.topicName()));
}
int currentInstanceCount =
InstanceCountProvider.getInstanceCount(cloudRunClientWrapper, workloadInfo);
logger.atInfo().log("[SCALING] Current instances: %d", currentInstanceCount);
// Current lag should never be empty here because we already checked that the topic exists.
Map<TopicPartition, Long> lagPerPartition =
kafka
.getLagPerPartition(staticConfig.topicName(), staticConfig.consumerGroupId())
.orElseThrow(() -> new AssertionError("Current lag is empty."));
long currentLag = lagPerPartition.values().stream().mapToLong(Long::longValue).sum();
ScalingConfig scalingConfig = configProvider.scalingConfig();
Behavior behavior = scalingConfig.spec().behavior();
ImmutableList<Metric> metrics = scalingConfig.spec().metrics();
MetricTarget lagTarget = null;
MetricTarget cpuTarget = null;
// TODO: Make this more polymorphic.
for (Metric metric : metrics) {
if (metric.type() == Metric.Type.RESOURCE
&& metric.resource().name().equals(SCALING_CONFIG_CPU_METRIC_NAME)) {
cpuTarget = metric.resource().target();
} else if (metric.type() == Metric.Type.EXTERNAL
&& metric.external().metric().name().equals(SCALING_CONFIG_LAG_METRIC_NAME)) {
lagTarget = metric.external().target();
}
}
if (lagTarget == null && cpuTarget == null) {
logger.atSevere().log(
"[SCALING] No scaling metric configured. At least one scaling metric must be"
+ " configured to enable autoscaling.");
return;
}
LagScaling.Recommendation lagBasedRecommendation = null;
if (lagTarget != null) {
lagBasedRecommendation =
LagScaling.makeRecommendation(lagTarget, currentInstanceCount, currentLag);
}
CpuScaling.Recommendation cpuBasedRecommendation = null;
if (cpuTarget != null) {
Optional<List<MetricsService.InstanceCountUtilization>> cpuUtilizationData =
getCpuUtilizationData(cpuTarget.windowSeconds());
if (cpuUtilizationData.isPresent()) {
cpuBasedRecommendation = CpuScaling.makeRecommendation(cpuTarget, cpuUtilizationData.get());
}
}
int recommendedInstanceCount =
max(
(lagBasedRecommendation != null && lagBasedRecommendation.isActive())
? lagBasedRecommendation.recommendedInstanceCount()
: 0,
(cpuBasedRecommendation != null && cpuBasedRecommendation.isActive())
? cpuBasedRecommendation.recommendedInstanceCount()
: 0);
if (recommendedInstanceCount > lagPerPartition.size()) {
recommendedInstanceCount = lagPerPartition.size();
logger.atWarning().log(
"The recommended number of instances (%d) is greater than the number of partitions"
+ " (%d). The recommendation will be limited to the number of partitions.",
recommendedInstanceCount, lagPerPartition.size());
}
int newInstanceCount =
scalingStabilizer.getBoundedRecommendation(
behavior, now, currentInstanceCount, recommendedInstanceCount);
if (staticConfig.outputScalerMetrics()) {
writeMetrics(currentLag, recommendedInstanceCount, newInstanceCount);
}
if (newInstanceCount == currentInstanceCount) {
// Skip update request if the number of instances is unchanged.
logger.atInfo().log("[SCALING] No change in recommended instances (%d)", newInstanceCount);
return;
}
Instant nextUpdateAllowedTime = getNextUpdateAllowedTime(behavior);
if (Instant.now().isAfter(nextUpdateAllowedTime)) {
updateInstanceCount(newInstanceCount);
logger.atInfo().log("[SCALING] Recommended instances: %d", newInstanceCount);
scalingStabilizer.markScaleEvent(behavior, now, currentInstanceCount, newInstanceCount);
} else {
// Rate limited due to cooldown period
logger.atInfo().log(
"[SCALING] Within cooldown, no change. Next update allowed at: %s",
nextUpdateAllowedTime);
}
}