public ScalingResult scale()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java [180:240]


    public ScalingResult scale(FlinkResourceContext<?> ctx, Configuration deployConfig)
            throws Exception {
        var resource = ctx.getResource();
        var observeConfig = ctx.getObserveConfig();

        if (!supportsInPlaceScaling(resource, observeConfig)) {
            return ScalingResult.CANNOT_SCALE;
        }

        var newOverrides = deployConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
        var previousOverrides = observeConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
        if (newOverrides.isEmpty() && previousOverrides.isEmpty()) {
            LOG.info("No overrides defined before or after. Cannot scale in-place.");
            return ScalingResult.CANNOT_SCALE;
        }

        try (var client = getClusterClient(observeConfig)) {
            var requirements = new HashMap<>(getVertexResources(client, resource));
            var result = ScalingResult.ALREADY_SCALED;

            for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry :
                    requirements.entrySet()) {
                var jobId = entry.getKey().toString();
                var parallelism = entry.getValue().getParallelism();
                var overrideStr = newOverrides.get(jobId);

                if (overrideStr != null) {
                    // We have an override for the vertex
                    int p = Integer.parseInt(overrideStr);
                    var newParallelism = new JobVertexResourceRequirements.Parallelism(p, p);
                    // If the requirements changed we mark this as scaling triggered
                    if (!parallelism.equals(newParallelism)) {
                        entry.setValue(new JobVertexResourceRequirements(newParallelism));
                        result = ScalingResult.SCALING_TRIGGERED;
                    }
                } else if (previousOverrides.containsKey(jobId)) {
                    LOG.info(
                            "Parallelism override for {} has been removed, falling back to regular upgrade.",
                            jobId);
                    return ScalingResult.CANNOT_SCALE;
                } else {
                    // No overrides for this vertex
                }
            }
            if (result == ScalingResult.ALREADY_SCALED) {
                LOG.info("Vertex resources requirements already match target, nothing to do...");
            } else {
                updateVertexResources(client, resource, requirements);
                eventRecorder.triggerEvent(
                        resource,
                        EventRecorder.Type.Normal,
                        EventRecorder.Reason.Scaling,
                        EventRecorder.Component.Job,
                        "In-place scaling triggered");
            }
            return result;
        } catch (Throwable t) {
            LOG.error("Error while rescaling, falling back to regular upgrade", t);
            return ScalingResult.CANNOT_SCALE;
        }
    }