public void realizeParallelismOverrides()

in flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java [68:125]


    public void realizeParallelismOverrides(
            Context context, Map<String, String> parallelismOverrides) throws Exception {
        Configuration conf = context.getConfiguration();
        if (!conf.get(JobManagerOptions.SCHEDULER)
                .equals(JobManagerOptions.SchedulerType.Adaptive)) {
            LOG.warn("In-place rescaling is only available with the adaptive scheduler.");
            return;
        }

        var jobID = context.getJobID();
        if (JobStatus.RUNNING != context.getJobStatus()) {
            LOG.warn("Job in terminal or reconciling state cannot be scaled in-place.");
            return;
        }

        var flinkRestClientTimeout = conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT);

        try (var client = context.getRestClusterClient()) {
            var requirements =
                    new HashMap<>(getVertexResources(client, jobID, flinkRestClientTimeout));
            var parallelismUpdated = false;

            for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry :
                    requirements.entrySet()) {
                var jobVertexId = entry.getKey().toString();
                var parallelism = entry.getValue().getParallelism();
                var overrideStr = parallelismOverrides.get(jobVertexId);

                // No overrides for this vertex
                if (overrideStr == null) {
                    continue;
                }

                // We have an override for the vertex
                var p = Integer.parseInt(overrideStr);
                var newParallelism = new JobVertexResourceRequirements.Parallelism(1, p);
                // If the requirements changed we mark this as scaling triggered
                if (!parallelism.equals(newParallelism)) {
                    entry.setValue(new JobVertexResourceRequirements(newParallelism));
                    parallelismUpdated = true;
                }
            }
            if (parallelismUpdated) {
                updateVertexResources(client, jobID, flinkRestClientTimeout, requirements);
                eventHandler.handleEvent(
                        context,
                        AutoScalerEventHandler.Type.Normal,
                        SCALING,
                        String.format(
                                "In-place scaling triggered, the new requirements is %s.",
                                requirements),
                        null,
                        null);
            } else {
                LOG.info("Vertex resources requirements already match target, nothing to do...");
            }
        }
    }