in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java [236:280]
protected static int scale(
int parallelism,
int numKeyGroups,
double scaleFactor,
int minParallelism,
int maxParallelism) {
Preconditions.checkArgument(
minParallelism <= maxParallelism,
"The minimum parallelism must not be greater than the maximum parallelism.");
if (minParallelism > numKeyGroups) {
LOG.warn(
"Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.",
minParallelism,
numKeyGroups);
}
if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) {
LOG.debug(
"Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.",
maxParallelism,
numKeyGroups);
}
int newParallelism =
// Prevent integer overflow when converting from double to integer.
// We do not have to detect underflow because doubles cannot
// underflow.
(int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE);
// Cap parallelism at either number of key groups or parallelism limit
final int upperBound = Math.min(numKeyGroups, maxParallelism);
// Apply min/max parallelism
newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);
// Try to adjust the parallelism such that it divides the number of key groups without a
// remainder => state is evenly spread across subtasks
for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
if (numKeyGroups % p == 0) {
return p;
}
}
// If key group adjustment fails, use originally computed parallelism
return newParallelism;
}