in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java [68:143]
public int computeScaleTargetParallelism(
AbstractFlinkResource<?, ?> resource,
Configuration conf,
JobVertexID vertex,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
SortedMap<Instant, ScalingSummary> history) {
var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
if (Double.isNaN(averageTrueProcessingRate)) {
LOG.warn(
"True processing rate is not available for {}, cannot compute new parallelism",
vertex);
return currentParallelism;
}
double targetCapacity =
AutoScalerUtils.getTargetProcessingCapacity(
evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true);
if (Double.isNaN(targetCapacity)) {
LOG.warn(
"Target data rate is not available for {}, cannot compute new parallelism",
vertex);
return currentParallelism;
}
LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
double scaleFactor = targetCapacity / averageTrueProcessingRate;
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
if (scaleFactor < minScaleFactor) {
LOG.debug(
"Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
scaleFactor,
vertex,
minScaleFactor);
scaleFactor = minScaleFactor;
} else if (scaleFactor > maxScaleFactor) {
LOG.debug(
"Computed scale factor of {} for {} is capped by maximum scale up factor to {}",
scaleFactor,
vertex,
maxScaleFactor);
scaleFactor = maxScaleFactor;
}
// Cap target capacity according to the capped scale factor
double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
LOG.debug("Capped target processing capacity for {} is {}", vertex, cappedTargetCapacity);
int newParallelism =
scale(
currentParallelism,
(int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
scaleFactor,
Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)));
if (newParallelism == currentParallelism
|| blockScalingBasedOnPastActions(
resource,
vertex,
conf,
evaluatedMetrics,
history,
currentParallelism,
newParallelism)) {
return currentParallelism;
}
// We record our expectations for this scaling operation
evaluatedMetrics.put(
ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(cappedTargetCapacity));
return newParallelism;
}