in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java [114:167]
private boolean evaluateRestarts(
Configuration configuration,
Map<String, String> clusterInfo,
ClusterHealthInfo lastValidClusterHealthInfo,
ClusterHealthInfo observedClusterHealthInfo) {
if (observedClusterHealthInfo.getNumRestarts()
< lastValidClusterHealthInfo.getNumRestarts()) {
LOG.debug(
"Observed health info number of restarts is less than in the last valid health info, skipping health check");
lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
observedClusterHealthInfo.getTimeStamp());
return true;
}
var timestampDiffMs =
observedClusterHealthInfo.getTimeStamp()
- lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp();
LOG.debug("Time difference between health infos: {}", Duration.ofMillis(timestampDiffMs));
var restartCheckWindow = configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW);
var restartCheckWindowMs = restartCheckWindow.toMillis();
double countMultiplier = (double) restartCheckWindowMs / (double) timestampDiffMs;
// If the 2 health info timestamp difference is within the window then no
// scaling needed
if (countMultiplier > 1) {
countMultiplier = 1;
}
long numRestarts =
(long)
((double)
(observedClusterHealthInfo.getNumRestarts()
- lastValidClusterHealthInfo.getNumRestarts())
* countMultiplier);
LOG.debug("Calculated restart count for {} window: {}", restartCheckWindow, numRestarts);
var restartThreshold = configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
boolean isHealthy = numRestarts <= restartThreshold;
if (!isHealthy) {
LOG.info("Restart count hit threshold: {}", restartThreshold);
}
if (lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp()
< clock.millis() - restartCheckWindowMs) {
LOG.debug(
"Last valid number of restarts evaluation timestamp is outside of the window");
lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
observedClusterHealthInfo.getTimeStamp());
}
return isHealthy;
}