in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java [175:241]
public boolean scale(FlinkResourceContext<?> ctx, Configuration deployConfig) throws Exception {
var resource = ctx.getResource();
var observeConfig = ctx.getObserveConfig();
if (!supportsInPlaceScaling(resource, observeConfig)) {
return false;
}
var newOverrides = deployConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
var previousOverrides = observeConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
if (newOverrides.isEmpty() && previousOverrides.isEmpty()) {
LOG.info("No overrides defined before or after. Cannot scale in-place.");
return false;
}
try (var client = getClusterClient(observeConfig)) {
var requirements = new HashMap<>(getVertexResources(client, resource));
var alreadyScaled = true;
for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry :
requirements.entrySet()) {
var jobId = entry.getKey().toString();
var parallelism = entry.getValue().getParallelism();
var overrideStr = newOverrides.get(jobId);
if (overrideStr != null) {
// We set the parallelism upper bound to the target parallelism, anything higher
// would defeat the purpose of scaling down
int upperBound = Integer.parseInt(overrideStr);
// We only change the lower bound if the new parallelism went below it. As we
// cannot guarantee that new resources can be acquired, increasing the lower
// bound to the target could potentially cause job failure.
int lowerBound = Math.min(upperBound, parallelism.getLowerBound());
var newParallelism =
new JobVertexResourceRequirements.Parallelism(lowerBound, upperBound);
// If the requirements changed we mark this as scaling triggered
if (!parallelism.equals(newParallelism)) {
entry.setValue(new JobVertexResourceRequirements(newParallelism));
alreadyScaled = false;
}
} else if (previousOverrides.containsKey(jobId)) {
LOG.info(
"Parallelism override for {} has been removed, falling back to regular upgrade.",
jobId);
return false;
} else {
// No overrides for this vertex
}
}
if (alreadyScaled) {
LOG.info("Vertex resources requirements already match target, nothing to do...");
} else {
updateVertexResources(client, resource, requirements);
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
EventRecorder.Reason.Scaling,
EventRecorder.Component.Job,
"In-place scaling triggered",
ctx.getKubernetesClient());
}
return true;
} catch (Throwable t) {
LOG.error("Error while rescaling, falling back to regular upgrade", t);
return false;
}
}