private boolean evaluateRestarts()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java [114:167]


    private boolean evaluateRestarts(
            Configuration configuration,
            Map<String, String> clusterInfo,
            ClusterHealthInfo lastValidClusterHealthInfo,
            ClusterHealthInfo observedClusterHealthInfo) {

        if (observedClusterHealthInfo.getNumRestarts()
                < lastValidClusterHealthInfo.getNumRestarts()) {
            LOG.debug(
                    "Observed health info number of restarts is less than in the last valid health info, skipping health check");
            lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
            lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
                    observedClusterHealthInfo.getTimeStamp());
            return true;
        }

        var timestampDiffMs =
                observedClusterHealthInfo.getTimeStamp()
                        - lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp();
        LOG.debug("Time difference between health infos: {}", Duration.ofMillis(timestampDiffMs));

        var restartCheckWindow = configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW);
        var restartCheckWindowMs = restartCheckWindow.toMillis();
        double countMultiplier = (double) restartCheckWindowMs / (double) timestampDiffMs;
        // If the 2 health info timestamp difference is within the window then no
        // scaling needed
        if (countMultiplier > 1) {
            countMultiplier = 1;
        }
        long numRestarts =
                (long)
                        ((double)
                                        (observedClusterHealthInfo.getNumRestarts()
                                                - lastValidClusterHealthInfo.getNumRestarts())
                                * countMultiplier);
        LOG.debug("Calculated restart count for {} window: {}", restartCheckWindow, numRestarts);

        var restartThreshold = configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
        boolean isHealthy = numRestarts <= restartThreshold;
        if (!isHealthy) {
            LOG.info("Restart count hit threshold: {}", restartThreshold);
        }

        if (lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp()
                < clock.millis() - restartCheckWindowMs) {
            LOG.debug(
                    "Last valid number of restarts evaluation timestamp is outside of the window");
            lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
            lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
                    observedClusterHealthInfo.getTimeStamp());
        }

        return isHealthy;
    }