in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java [69:112]
public void evaluate(
Configuration configuration,
Map<String, String> clusterInfo,
ClusterHealthInfo observedClusterHealthInfo) {
if (ClusterHealthInfo.isValid(observedClusterHealthInfo)) {
LOG.debug("Observed health info is valid");
var lastValidClusterHealthInfo = getLastValidClusterHealthInfo(clusterInfo);
if (lastValidClusterHealthInfo == null) {
LOG.debug("No last valid health info, skipping health check");
observedClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
observedClusterHealthInfo.getTimeStamp());
observedClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
observedClusterHealthInfo.getTimeStamp());
setLastValidClusterHealthInfo(clusterInfo, observedClusterHealthInfo);
} else if (observedClusterHealthInfo.getTimeStamp()
< lastValidClusterHealthInfo.getTimeStamp()) {
String msg =
"Observed health info timestamp is less than the last valid health info timestamp, this indicates a bug...";
LOG.error(msg);
throw new IllegalStateException(msg);
} else {
LOG.debug("Valid health info exist, checking cluster health");
LOG.debug("Last valid health info: {}", lastValidClusterHealthInfo);
LOG.debug("Observed health info: {}", observedClusterHealthInfo);
boolean isHealthy =
evaluateRestarts(
configuration,
clusterInfo,
lastValidClusterHealthInfo,
observedClusterHealthInfo)
&& evaluateCheckpoints(
configuration,
lastValidClusterHealthInfo,
observedClusterHealthInfo);
lastValidClusterHealthInfo.setTimeStamp(observedClusterHealthInfo.getTimeStamp());
lastValidClusterHealthInfo.setHealthy(isHealthy);
setLastValidClusterHealthInfo(clusterInfo, lastValidClusterHealthInfo);
}
}
}