in src/main/java/com/google/cloud/run/kafkascaler/LagScaling.java [53:78]
public static Recommendation makeRecommendation(
MetricTarget lagTarget, int currentInstanceCount, long currentLag) {
if (currentLag <= lagTarget.activationThreshold()) {
logger.atInfo().log(
"[LAG] Scaling inactive, current lag: %d, activation lag threshold: %d",
currentLag, lagTarget.activationThreshold());
return new Recommendation(false, 0);
}
int recommendedInstanceCount = currentInstanceCount;
double scalingFactor = currentLag / (double) lagTarget.averageValue();
double upperTolerance = (1 + lagTarget.tolerance()) * lagTarget.averageValue();
double lowerTolerance = (1 - lagTarget.tolerance()) * lagTarget.averageValue();
logger.atInfo().log(
"[LAG] Current Lag: %d, upper tolerance: %.2f, lower tolerance: %.2f\n",
currentLag, upperTolerance, lowerTolerance);
if (currentLag <= upperTolerance && currentLag >= lowerTolerance) {
logger.atInfo().log("[LAG] Within tolerance, no change");
} else {
recommendedInstanceCount = (int) Math.ceil(max(currentInstanceCount, 1) * scalingFactor);
logger.atInfo().log("[LAG] Recommended instance count: %d", recommendedInstanceCount);
}
return new Recommendation(true, recommendedInstanceCount);
}