private void runScalingLogic()

in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java [180:242]


    private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetrics)
            throws Exception {

        var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
        var jobTopology = collectedMetrics.getJobTopology();

        var now = clock.instant();
        var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
        var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
        // A scaling tracking without an end time gets created whenever a scaling decision is
        // applied. Here, we record the end time for it (runScalingLogic is only called when the job
        // transitions back into the RUNNING state).
        if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
                collectedMetrics.getJobRunningTs(), jobTopology, scalingHistory)) {
            stateStore.storeScalingTracking(ctx, scalingTracking);
        }

        // We require at least 2 metric collections for evaluation as required by rate computations
        // from accumulated metrics
        if (collectedMetrics.getMetricHistory().size() < 2) {
            return;
        }
        LOG.debug("Collected metrics: {}", collectedMetrics);

        // Scaling tracking data contains previous restart times that are taken into account
        var restartTime = scalingTracking.getMaxRestartTimeOrDefault(ctx.getConfiguration());
        var evaluatedMetrics =
                evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, restartTime);
        LOG.debug("Evaluated metrics: {}", evaluatedMetrics);
        lastEvaluatedMetrics.put(ctx.getJobKey(), evaluatedMetrics);

        initRecommendedParallelism(evaluatedMetrics.getVertexMetrics());
        autoscalerMetrics.registerScalingMetrics(
                jobTopology.getVerticesInTopologicalOrder(),
                () -> lastEvaluatedMetrics.get(ctx.getJobKey()));

        if (!collectedMetrics.isFullyCollected()) {
            // We have done an upfront evaluation, but we are not ready for scaling.
            resetRecommendedParallelism(evaluatedMetrics.getVertexMetrics());
            return;
        }

        var delayedScaleDown = stateStore.getDelayedScaleDown(ctx);
        var parallelismChanged =
                scalingExecutor.scaleResource(
                        ctx,
                        evaluatedMetrics,
                        scalingHistory,
                        scalingTracking,
                        now,
                        jobTopology,
                        delayedScaleDown);

        if (delayedScaleDown.isUpdated()) {
            stateStore.storeDelayedScaleDown(ctx, delayedScaleDown);
        }

        if (parallelismChanged) {
            autoscalerMetrics.incrementScaling();
        } else {
            autoscalerMetrics.incrementBalanced();
        }
    }