void waitForClusterShutdown()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [872:922]


    void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) {
        LOG.info("Waiting for cluster shutdown...");

        boolean jobManagerRunning = true;
        boolean taskManagerRunning = true;
        boolean serviceRunning = true;

        for (int i = 0; i < shutdownTimeout; i++) {
            if (jobManagerRunning) {
                PodList jmPodList = getJmPodList(namespace, clusterId);

                if (jmPodList == null || jmPodList.getItems().isEmpty()) {
                    jobManagerRunning = false;
                }
            }
            if (taskManagerRunning) {
                PodList tmPodList = getTmPodList(namespace, clusterId);

                if (tmPodList.getItems().isEmpty()) {
                    taskManagerRunning = false;
                }
            }

            if (serviceRunning) {
                Service service =
                        kubernetesClient
                                .services()
                                .inNamespace(namespace)
                                .withName(
                                        ExternalServiceDecorator.getExternalServiceName(clusterId))
                                .get();
                if (service == null) {
                    serviceRunning = false;
                }
            }

            if (!jobManagerRunning && !serviceRunning && !taskManagerRunning) {
                break;
            }
            // log a message waiting to shutdown Flink cluster every 5 seconds.
            if ((i + 1) % 5 == 0) {
                LOG.info("Waiting for cluster shutdown... ({}s)", i + 1);
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        LOG.info("Cluster shutdown completed.");
    }