in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java [96:163]
public boolean scaleResource(
Context context,
EvaluatedMetrics evaluatedMetrics,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory,
ScalingTracking scalingTracking,
Instant now,
JobTopology jobTopology,
DelayedScaleDown delayedScaleDown)
throws Exception {
var conf = context.getConfiguration();
var restartTime = scalingTracking.getMaxRestartTimeOrDefault(conf);
var scalingSummaries =
computeScalingSummary(
context,
evaluatedMetrics,
scalingHistory,
restartTime,
jobTopology,
delayedScaleDown);
if (scalingSummaries.isEmpty()) {
LOG.info("All job vertices are currently running at their target parallelism.");
return false;
}
updateRecommendedParallelism(evaluatedMetrics.getVertexMetrics(), scalingSummaries);
if (checkIfBlockedAndTriggerScalingEvent(context, scalingSummaries, conf, now)) {
return false;
}
var configOverrides =
MemoryTuning.tuneTaskManagerMemory(
context,
evaluatedMetrics,
jobTopology,
scalingSummaries,
autoScalerEventHandler);
var memoryTuningEnabled = conf.get(AutoScalerOptions.MEMORY_TUNING_ENABLED);
if (scalingWouldExceedMaxResources(
memoryTuningEnabled ? configOverrides.newConfigWithOverrides(conf) : conf,
jobTopology,
evaluatedMetrics,
scalingSummaries,
context)) {
return false;
}
addToScalingHistoryAndStore(
autoScalerStateStore, context, scalingHistory, now, scalingSummaries);
scalingTracking.addScalingRecord(now, new ScalingRecord());
autoScalerStateStore.storeScalingTracking(context, scalingTracking);
autoScalerStateStore.storeParallelismOverrides(
context,
getVertexParallelismOverrides(
evaluatedMetrics.getVertexMetrics(), scalingSummaries));
autoScalerStateStore.storeConfigChanges(context, configOverrides);
// Try to clear all delayed scale down requests after scaling.
delayedScaleDown.clearAll();
return true;
}