in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java [180:242]
private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetrics)
throws Exception {
var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
var jobTopology = collectedMetrics.getJobTopology();
var now = clock.instant();
var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
// A scaling tracking without an end time gets created whenever a scaling decision is
// applied. Here, we record the end time for it (runScalingLogic is only called when the job
// transitions back into the RUNNING state).
if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
collectedMetrics.getJobRunningTs(), jobTopology, scalingHistory)) {
stateStore.storeScalingTracking(ctx, scalingTracking);
}
// We require at least 2 metric collections for evaluation as required by rate computations
// from accumulated metrics
if (collectedMetrics.getMetricHistory().size() < 2) {
return;
}
LOG.debug("Collected metrics: {}", collectedMetrics);
// Scaling tracking data contains previous restart times that are taken into account
var restartTime = scalingTracking.getMaxRestartTimeOrDefault(ctx.getConfiguration());
var evaluatedMetrics =
evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, restartTime);
LOG.debug("Evaluated metrics: {}", evaluatedMetrics);
lastEvaluatedMetrics.put(ctx.getJobKey(), evaluatedMetrics);
initRecommendedParallelism(evaluatedMetrics.getVertexMetrics());
autoscalerMetrics.registerScalingMetrics(
jobTopology.getVerticesInTopologicalOrder(),
() -> lastEvaluatedMetrics.get(ctx.getJobKey()));
if (!collectedMetrics.isFullyCollected()) {
// We have done an upfront evaluation, but we are not ready for scaling.
resetRecommendedParallelism(evaluatedMetrics.getVertexMetrics());
return;
}
var delayedScaleDown = stateStore.getDelayedScaleDown(ctx);
var parallelismChanged =
scalingExecutor.scaleResource(
ctx,
evaluatedMetrics,
scalingHistory,
scalingTracking,
now,
jobTopology,
delayedScaleDown);
if (delayedScaleDown.isUpdated()) {
stateStore.storeDelayedScaleDown(ctx, delayedScaleDown);
}
if (parallelismChanged) {
autoscalerMetrics.incrementScaling();
} else {
autoscalerMetrics.incrementBalanced();
}
}