public boolean scale()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java [175:241]


    public boolean scale(FlinkResourceContext<?> ctx, Configuration deployConfig) throws Exception {
        var resource = ctx.getResource();
        var observeConfig = ctx.getObserveConfig();

        if (!supportsInPlaceScaling(resource, observeConfig)) {
            return false;
        }

        var newOverrides = deployConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
        var previousOverrides = observeConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
        if (newOverrides.isEmpty() && previousOverrides.isEmpty()) {
            LOG.info("No overrides defined before or after. Cannot scale in-place.");
            return false;
        }

        try (var client = getClusterClient(observeConfig)) {
            var requirements = new HashMap<>(getVertexResources(client, resource));
            var alreadyScaled = true;

            for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry :
                    requirements.entrySet()) {
                var jobId = entry.getKey().toString();
                var parallelism = entry.getValue().getParallelism();
                var overrideStr = newOverrides.get(jobId);

                if (overrideStr != null) {
                    // We set the parallelism upper bound to the target parallelism, anything higher
                    // would defeat the purpose of scaling down
                    int upperBound = Integer.parseInt(overrideStr);
                    // We only change the lower bound if the new parallelism went below it. As we
                    // cannot guarantee that new resources can be acquired, increasing the lower
                    // bound to the target could potentially cause job failure.
                    int lowerBound = Math.min(upperBound, parallelism.getLowerBound());
                    var newParallelism =
                            new JobVertexResourceRequirements.Parallelism(lowerBound, upperBound);
                    // If the requirements changed we mark this as scaling triggered
                    if (!parallelism.equals(newParallelism)) {
                        entry.setValue(new JobVertexResourceRequirements(newParallelism));
                        alreadyScaled = false;
                    }
                } else if (previousOverrides.containsKey(jobId)) {
                    LOG.info(
                            "Parallelism override for {} has been removed, falling back to regular upgrade.",
                            jobId);
                    return false;
                } else {
                    // No overrides for this vertex
                }
            }
            if (alreadyScaled) {
                LOG.info("Vertex resources requirements already match target, nothing to do...");
            } else {
                updateVertexResources(client, resource, requirements);
                eventRecorder.triggerEvent(
                        resource,
                        EventRecorder.Type.Normal,
                        EventRecorder.Reason.Scaling,
                        EventRecorder.Component.Job,
                        "In-place scaling triggered",
                        ctx.getKubernetesClient());
            }
            return true;
        } catch (Throwable t) {
            LOG.error("Error while rescaling, falling back to regular upgrade", t);
            return false;
        }
    }