in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java [180:240]
public ScalingResult scale(FlinkResourceContext<?> ctx, Configuration deployConfig)
throws Exception {
var resource = ctx.getResource();
var observeConfig = ctx.getObserveConfig();
if (!supportsInPlaceScaling(resource, observeConfig)) {
return ScalingResult.CANNOT_SCALE;
}
var newOverrides = deployConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
var previousOverrides = observeConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
if (newOverrides.isEmpty() && previousOverrides.isEmpty()) {
LOG.info("No overrides defined before or after. Cannot scale in-place.");
return ScalingResult.CANNOT_SCALE;
}
try (var client = getClusterClient(observeConfig)) {
var requirements = new HashMap<>(getVertexResources(client, resource));
var result = ScalingResult.ALREADY_SCALED;
for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry :
requirements.entrySet()) {
var jobId = entry.getKey().toString();
var parallelism = entry.getValue().getParallelism();
var overrideStr = newOverrides.get(jobId);
if (overrideStr != null) {
// We have an override for the vertex
int p = Integer.parseInt(overrideStr);
var newParallelism = new JobVertexResourceRequirements.Parallelism(p, p);
// If the requirements changed we mark this as scaling triggered
if (!parallelism.equals(newParallelism)) {
entry.setValue(new JobVertexResourceRequirements(newParallelism));
result = ScalingResult.SCALING_TRIGGERED;
}
} else if (previousOverrides.containsKey(jobId)) {
LOG.info(
"Parallelism override for {} has been removed, falling back to regular upgrade.",
jobId);
return ScalingResult.CANNOT_SCALE;
} else {
// No overrides for this vertex
}
}
if (result == ScalingResult.ALREADY_SCALED) {
LOG.info("Vertex resources requirements already match target, nothing to do...");
} else {
updateVertexResources(client, resource, requirements);
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
EventRecorder.Reason.Scaling,
EventRecorder.Component.Job,
"In-place scaling triggered");
}
return result;
} catch (Throwable t) {
LOG.error("Error while rescaling, falling back to regular upgrade", t);
return ScalingResult.CANNOT_SCALE;
}
}