in flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java [68:125]
public void realizeParallelismOverrides(
Context context, Map<String, String> parallelismOverrides) throws Exception {
Configuration conf = context.getConfiguration();
if (!conf.get(JobManagerOptions.SCHEDULER)
.equals(JobManagerOptions.SchedulerType.Adaptive)) {
LOG.warn("In-place rescaling is only available with the adaptive scheduler.");
return;
}
var jobID = context.getJobID();
if (JobStatus.RUNNING != context.getJobStatus()) {
LOG.warn("Job in terminal or reconciling state cannot be scaled in-place.");
return;
}
var flinkRestClientTimeout = conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT);
try (var client = context.getRestClusterClient()) {
var requirements =
new HashMap<>(getVertexResources(client, jobID, flinkRestClientTimeout));
var parallelismUpdated = false;
for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry :
requirements.entrySet()) {
var jobVertexId = entry.getKey().toString();
var parallelism = entry.getValue().getParallelism();
var overrideStr = parallelismOverrides.get(jobVertexId);
// No overrides for this vertex
if (overrideStr == null) {
continue;
}
// We have an override for the vertex
var p = Integer.parseInt(overrideStr);
var newParallelism = new JobVertexResourceRequirements.Parallelism(1, p);
// If the requirements changed we mark this as scaling triggered
if (!parallelism.equals(newParallelism)) {
entry.setValue(new JobVertexResourceRequirements(newParallelism));
parallelismUpdated = true;
}
}
if (parallelismUpdated) {
updateVertexResources(client, jobID, flinkRestClientTimeout, requirements);
eventHandler.handleEvent(
context,
AutoScalerEventHandler.Type.Normal,
SCALING,
String.format(
"In-place scaling triggered, the new requirements is %s.",
requirements),
null,
null);
} else {
LOG.info("Vertex resources requirements already match target, nothing to do...");
}
}
}