public boolean scalingCompleted()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java [306:349]


    public boolean scalingCompleted(FlinkResourceContext<?> ctx) {
        var conf = ctx.getObserveConfig();
        var status = ctx.getResource().getStatus();
        try (var client = ctx.getFlinkService().getClusterClient(conf)) {
            var jobId = JobID.fromHexString(status.getJobStatus().getJobId());
            var jobDetailsInfo = client.getJobDetails(jobId).get();

            // Return false on empty jobgraph
            if (jobDetailsInfo.getJobVertexInfos().isEmpty()) {
                return false;
            }

            Map<JobVertexID, Integer> currentParallelisms =
                    jobDetailsInfo.getJobVertexInfos().stream()
                            .collect(
                                    Collectors.toMap(
                                            JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID,
                                            JobDetailsInfo.JobVertexDetailsInfo::getParallelism));

            Map<String, String> parallelismOverrides =
                    conf.get(PipelineOptions.PARALLELISM_OVERRIDES);
            for (Map.Entry<JobVertexID, Integer> entry : currentParallelisms.entrySet()) {
                String override = parallelismOverrides.get(entry.getKey().toHexString());
                if (override == null) {
                    // No override defined for this vertex
                    continue;
                }
                Integer overrideParallelism = Integer.valueOf(override);
                if (!overrideParallelism.equals(entry.getValue())) {
                    LOG.info(
                            "Scaling still in progress for vertex {}, {} -> {}",
                            entry.getKey(),
                            entry.getValue(),
                            overrideParallelism);
                    return false;
                }
            }
            LOG.info("All vertexes have successfully scaled");
            status.getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
            return true;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }