in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java [157:250]
public ParallelismChange computeScaleTargetParallelism(
Context context,
JobVertexID vertex,
Collection<ShipStrategy> inputShipStrategies,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
SortedMap<Instant, ScalingSummary> history,
Duration restartTime,
DelayedScaleDown delayedScaleDown) {
var conf = context.getConfiguration();
var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
if (Double.isNaN(averageTrueProcessingRate)) {
LOG.warn(
"True processing rate is not available for {}, cannot compute new parallelism",
vertex);
return ParallelismChange.noChange();
}
double targetCapacity =
AutoScalerUtils.getTargetProcessingCapacity(
evaluatedMetrics, conf, conf.get(UTILIZATION_TARGET), true, restartTime);
if (Double.isNaN(targetCapacity)) {
LOG.warn(
"Target data rate is not available for {}, cannot compute new parallelism",
vertex);
return ParallelismChange.noChange();
}
LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
double scaleFactor = targetCapacity / averageTrueProcessingRate;
if (conf.get(OBSERVED_SCALABILITY_ENABLED)) {
double scalingCoefficient =
JobVertexScaler.calculateObservedScalingCoefficient(history, conf);
scaleFactor = scaleFactor / scalingCoefficient;
}
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
if (scaleFactor < minScaleFactor) {
LOG.debug(
"Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
scaleFactor,
vertex,
minScaleFactor);
scaleFactor = minScaleFactor;
} else if (scaleFactor > maxScaleFactor) {
LOG.debug(
"Computed scale factor of {} for {} is capped by maximum scale up factor to {}",
scaleFactor,
vertex,
maxScaleFactor);
scaleFactor = maxScaleFactor;
}
// Cap target capacity according to the capped scale factor
double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
LOG.debug("Capped target processing capacity for {} is {}", vertex, cappedTargetCapacity);
int newParallelism =
scale(
vertex,
currentParallelism,
inputShipStrategies,
(int) evaluatedMetrics.get(NUM_SOURCE_PARTITIONS).getCurrent(),
(int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
scaleFactor,
Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)),
autoScalerEventHandler,
context);
if (newParallelism == currentParallelism) {
// Clear delayed scale down request if the new parallelism is equal to
// currentParallelism.
delayedScaleDown.clearVertex(vertex);
return ParallelismChange.noChange();
}
// We record our expectations for this scaling operation
evaluatedMetrics.put(
ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(cappedTargetCapacity));
return detectBlockScaling(
context,
vertex,
conf,
evaluatedMetrics,
history,
currentParallelism,
newParallelism,
delayedScaleDown);
}