in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java [306:349]
public boolean scalingCompleted(FlinkResourceContext<?> ctx) {
var conf = ctx.getObserveConfig();
var status = ctx.getResource().getStatus();
try (var client = ctx.getFlinkService().getClusterClient(conf)) {
var jobId = JobID.fromHexString(status.getJobStatus().getJobId());
var jobDetailsInfo = client.getJobDetails(jobId).get();
// Return false on empty jobgraph
if (jobDetailsInfo.getJobVertexInfos().isEmpty()) {
return false;
}
Map<JobVertexID, Integer> currentParallelisms =
jobDetailsInfo.getJobVertexInfos().stream()
.collect(
Collectors.toMap(
JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID,
JobDetailsInfo.JobVertexDetailsInfo::getParallelism));
Map<String, String> parallelismOverrides =
conf.get(PipelineOptions.PARALLELISM_OVERRIDES);
for (Map.Entry<JobVertexID, Integer> entry : currentParallelisms.entrySet()) {
String override = parallelismOverrides.get(entry.getKey().toHexString());
if (override == null) {
// No override defined for this vertex
continue;
}
Integer overrideParallelism = Integer.valueOf(override);
if (!overrideParallelism.equals(entry.getValue())) {
LOG.info(
"Scaling still in progress for vertex {}, {} -> {}",
entry.getKey(),
entry.getValue(),
overrideParallelism);
return false;
}
}
LOG.info("All vertexes have successfully scaled");
status.getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}