in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java [294:327]
private boolean shouldRestartJobBecauseUnhealthy(
FlinkDeployment deployment, Configuration observeConfig) {
boolean restartNeeded = false;
if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
var clusterInfo = deployment.getStatus().getClusterInfo();
ClusterHealthInfo clusterHealthInfo =
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
if (clusterHealthInfo != null) {
LOG.debug("Cluster info contains job health info");
if (!clusterHealthInfo.isHealthy()) {
if (deployment.getSpec().getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
LOG.debug("Stateless job, recovering unhealthy jobmanager deployment");
restartNeeded = true;
} else if (HighAvailabilityMode.isHighAvailabilityModeActivated(
observeConfig)) {
LOG.debug("HA is enabled, recovering unhealthy jobmanager deployment");
restartNeeded = true;
} else {
LOG.warn(
"Could not recover unhealthy jobmanager deployment without HA enabled");
}
if (restartNeeded) {
ClusterHealthEvaluator.removeLastValidClusterHealthInfo(clusterInfo);
}
}
} else {
LOG.debug("Cluster info not contains job health info, skipping health check");
}
}
return restartNeeded;
}