in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [872:922]
void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) {
LOG.info("Waiting for cluster shutdown...");
boolean jobManagerRunning = true;
boolean taskManagerRunning = true;
boolean serviceRunning = true;
for (int i = 0; i < shutdownTimeout; i++) {
if (jobManagerRunning) {
PodList jmPodList = getJmPodList(namespace, clusterId);
if (jmPodList == null || jmPodList.getItems().isEmpty()) {
jobManagerRunning = false;
}
}
if (taskManagerRunning) {
PodList tmPodList = getTmPodList(namespace, clusterId);
if (tmPodList.getItems().isEmpty()) {
taskManagerRunning = false;
}
}
if (serviceRunning) {
Service service =
kubernetesClient
.services()
.inNamespace(namespace)
.withName(
ExternalServiceDecorator.getExternalServiceName(clusterId))
.get();
if (service == null) {
serviceRunning = false;
}
}
if (!jobManagerRunning && !serviceRunning && !taskManagerRunning) {
break;
}
// log a message waiting to shutdown Flink cluster every 5 seconds.
if ((i + 1) % 5 == 0) {
LOG.info("Waiting for cluster shutdown... ({}s)", i + 1);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
LOG.info("Cluster shutdown completed.");
}