in src/main/java/com/google/cloud/run/kafkascaler/ScalingStabilizer.java [382:428]
private Optional<Integer> stabilizeRecommendation(
Behavior behavior,
Instant time,
int currentInstanceCount,
int recommendedInstances,
BoundsDebugInfo debug) {
Instant scaleUpStabilizationCutoff = getStabilizationCutoff(time, behavior.scaleUp());
Instant scaleDownStabilizationCutoff = getStabilizationCutoff(time, behavior.scaleDown());
int minInUpStabilizationWindow = recommendedInstances;
int maxInDownStabilizationWindow = recommendedInstances;
Iterator<DataPoint> iterator = recommendations.iterator();
while (iterator.hasNext()) {
DataPoint recommendation = iterator.next();
if (recommendation.timestamp().isAfter(time)) {
// We're done searching once we reach this because the list is sorted by time.
break;
}
if (recommendation.timestamp().isAfter(scaleUpStabilizationCutoff)) {
minInUpStabilizationWindow = min(minInUpStabilizationWindow, recommendation.value());
}
if (recommendation.timestamp().isAfter(scaleDownStabilizationCutoff)) {
maxInDownStabilizationWindow = max(maxInDownStabilizationWindow, recommendation.value());
}
if (recommendation.timestamp().isBefore(scaleUpStabilizationCutoff)
&& recommendation.timestamp().isBefore(scaleDownStabilizationCutoff)) {
iterator.remove();
}
}
int stabilizedRecommendation = currentInstanceCount;
if (stabilizedRecommendation < minInUpStabilizationWindow) {
stabilizedRecommendation = minInUpStabilizationWindow;
}
if (stabilizedRecommendation > maxInDownStabilizationWindow) {
stabilizedRecommendation = maxInDownStabilizationWindow;
}
if (recommendedInstances > currentInstanceCount) {
debug.addScaleUp(Policy.Type.STABILIZATION, minInUpStabilizationWindow);
} else if (recommendedInstances < currentInstanceCount) {
debug.addScaleDown(Policy.Type.STABILIZATION, maxInDownStabilizationWindow);
}
return Optional.of(stabilizedRecommendation);
}