in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java [173:255]
private boolean evaluateCheckpoints(
Configuration configuration,
ClusterHealthInfo lastValidClusterHealthInfo,
ClusterHealthInfo observedClusterHealthInfo) {
if (!configuration.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED)) {
return true;
}
var windowOpt =
configuration.getOptional(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
CheckpointConfig checkpointConfig = new CheckpointConfig();
checkpointConfig.configure(configuration);
var checkpointingInterval = checkpointConfig.getCheckpointInterval();
var checkpointingTimeout = checkpointConfig.getCheckpointTimeout();
var tolerationFailureNumber = checkpointConfig.getTolerableCheckpointFailureNumber() + 2;
var minCheckWindow =
Duration.ofMillis(
Math.max(
checkpointingInterval * tolerationFailureNumber,
checkpointingTimeout * tolerationFailureNumber));
if (windowOpt.isEmpty() && !checkpointConfig.isCheckpointingEnabled()) {
// If no explicit checkpoint check window is specified and checkpointing is disabled
// based on the config, we don't do anything
return true;
}
var completedCheckpointsCheckWindow =
windowOpt
.filter(
d -> {
if (d.compareTo(minCheckWindow) < 0) {
LOG.debug(
"{} is not long enough. Default to max({} * {}, {} * {}): {}",
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW
.key(),
CHECKPOINTING_INTERVAL.key(),
TOLERABLE_FAILURE_NUMBER.key(),
CHECKPOINTING_TIMEOUT.key(),
TOLERABLE_FAILURE_NUMBER.key(),
minCheckWindow);
return false;
}
return true;
})
.orElse(minCheckWindow);
if (observedClusterHealthInfo.getNumCompletedCheckpoints()
< lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
LOG.debug(
"Observed health info number of completed checkpoints is less than in the last valid health info, skipping health check");
lastValidClusterHealthInfo.setNumCompletedCheckpoints(
observedClusterHealthInfo.getNumCompletedCheckpoints());
lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
observedClusterHealthInfo.getTimeStamp());
return true;
}
var timestampDiffMs =
observedClusterHealthInfo.getTimeStamp()
- lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp();
LOG.debug("Time difference between health infos: {}", Duration.ofMillis(timestampDiffMs));
boolean isHealthy = true;
var completedCheckpointsCheckWindowMs = completedCheckpointsCheckWindow.toMillis();
if (observedClusterHealthInfo.getNumCompletedCheckpoints()
> lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
LOG.debug("Last valid number of completed checkpoints increased marking timestamp");
lastValidClusterHealthInfo.setNumCompletedCheckpoints(
observedClusterHealthInfo.getNumCompletedCheckpoints());
lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
observedClusterHealthInfo.getTimeStamp());
} else if (lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp()
+ completedCheckpointsCheckWindowMs
< clock.millis()) {
LOG.info("Cluster is not able to complete checkpoints");
isHealthy = false;
}
return isHealthy;
}