in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java [169:213]
private boolean evaluateCheckpoints(
Configuration configuration,
ClusterHealthInfo lastValidClusterHealthInfo,
ClusterHealthInfo observedClusterHealthInfo) {
if (!configuration.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED)) {
return true;
}
if (observedClusterHealthInfo.getNumCompletedCheckpoints()
< lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
LOG.debug(
"Observed health info number of completed checkpoints is less than in the last valid health info, skipping health check");
lastValidClusterHealthInfo.setNumCompletedCheckpoints(
observedClusterHealthInfo.getNumCompletedCheckpoints());
lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
observedClusterHealthInfo.getTimeStamp());
return true;
}
var timestampDiffMs =
observedClusterHealthInfo.getTimeStamp()
- lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp();
LOG.debug("Time difference between health infos: {}", Duration.ofMillis(timestampDiffMs));
boolean isHealthy = true;
var completedCheckpointsCheckWindow =
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
var completedCheckpointsCheckWindowMs = completedCheckpointsCheckWindow.toMillis();
if (observedClusterHealthInfo.getNumCompletedCheckpoints()
> lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
LOG.debug("Last valid number of completed checkpoints increased marking timestamp");
lastValidClusterHealthInfo.setNumCompletedCheckpoints(
observedClusterHealthInfo.getNumCompletedCheckpoints());
lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
observedClusterHealthInfo.getTimeStamp());
} else if (lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp()
+ completedCheckpointsCheckWindowMs
< clock.millis()) {
LOG.info("Cluster is not able to complete checkpoints");
isHealthy = false;
}
return isHealthy;
}