protected static int scale()

in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java [236:280]


    protected static int scale(
            int parallelism,
            int numKeyGroups,
            double scaleFactor,
            int minParallelism,
            int maxParallelism) {
        Preconditions.checkArgument(
                minParallelism <= maxParallelism,
                "The minimum parallelism must not be greater than the maximum parallelism.");
        if (minParallelism > numKeyGroups) {
            LOG.warn(
                    "Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.",
                    minParallelism,
                    numKeyGroups);
        }
        if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) {
            LOG.debug(
                    "Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.",
                    maxParallelism,
                    numKeyGroups);
        }

        int newParallelism =
                // Prevent integer overflow when converting from double to integer.
                // We do not have to detect underflow because doubles cannot
                // underflow.
                (int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE);

        // Cap parallelism at either number of key groups or parallelism limit
        final int upperBound = Math.min(numKeyGroups, maxParallelism);

        // Apply min/max parallelism
        newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);

        // Try to adjust the parallelism such that it divides the number of key groups without a
        // remainder => state is evenly spread across subtasks
        for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
            if (numKeyGroups % p == 0) {
                return p;
            }
        }

        // If key group adjustment fails, use originally computed parallelism
        return newParallelism;
    }