in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java [179:242]
Map<JobVertexID, ScalingSummary> computeScalingSummary(
Context context,
EvaluatedMetrics evaluatedMetrics,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory,
Duration restartTime,
JobTopology jobTopology,
DelayedScaleDown delayedScaleDown) {
LOG.debug("Restart time used in scaling summary computation: {}", restartTime);
if (isJobUnderMemoryPressure(context, evaluatedMetrics.getGlobalMetrics())) {
LOG.info("Skipping vertex scaling due to memory pressure");
return Map.of();
}
var out = new HashMap<JobVertexID, ScalingSummary>();
var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
AtomicBoolean anyVertexOutsideBound = new AtomicBoolean(false);
evaluatedMetrics
.getVertexMetrics()
.forEach(
(v, metrics) -> {
if (excludeVertexIdList.contains(v.toHexString())) {
LOG.debug(
"Vertex {} is part of `vertex.exclude.ids` config, Ignoring it for scaling",
v);
} else {
var currentParallelism =
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
var parallelismChange =
jobVertexScaler.computeScaleTargetParallelism(
context,
v,
jobTopology.get(v).getInputs().values(),
metrics,
scalingHistory.getOrDefault(
v, Collections.emptySortedMap()),
restartTime,
delayedScaleDown);
if (parallelismChange.isNoChange()) {
return;
}
if (parallelismChange.isOutsideUtilizationBound()) {
anyVertexOutsideBound.set(true);
}
out.put(
v,
new ScalingSummary(
currentParallelism,
parallelismChange.getNewParallelism(),
metrics));
}
});
// If the Utilization of all tasks is within range, we can skip scaling.
if (!anyVertexOutsideBound.get()) {
LOG.info("All vertex processing rates are within target.");
return Map.of();
}
return out;
}