public void evaluate()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java [69:112]


    public void evaluate(
            Configuration configuration,
            Map<String, String> clusterInfo,
            ClusterHealthInfo observedClusterHealthInfo) {

        if (ClusterHealthInfo.isValid(observedClusterHealthInfo)) {
            LOG.debug("Observed health info is valid");

            var lastValidClusterHealthInfo = getLastValidClusterHealthInfo(clusterInfo);
            if (lastValidClusterHealthInfo == null) {
                LOG.debug("No last valid health info, skipping health check");
                observedClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
                        observedClusterHealthInfo.getTimeStamp());
                observedClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
                        observedClusterHealthInfo.getTimeStamp());
                setLastValidClusterHealthInfo(clusterInfo, observedClusterHealthInfo);
            } else if (observedClusterHealthInfo.getTimeStamp()
                    < lastValidClusterHealthInfo.getTimeStamp()) {
                String msg =
                        "Observed health info timestamp is less than the last valid health info timestamp, this indicates a bug...";
                LOG.error(msg);
                throw new IllegalStateException(msg);
            } else {
                LOG.debug("Valid health info exist, checking cluster health");
                LOG.debug("Last valid health info: {}", lastValidClusterHealthInfo);
                LOG.debug("Observed health info: {}", observedClusterHealthInfo);

                boolean isHealthy =
                        evaluateRestarts(
                                        configuration,
                                        clusterInfo,
                                        lastValidClusterHealthInfo,
                                        observedClusterHealthInfo)
                                && evaluateCheckpoints(
                                        configuration,
                                        lastValidClusterHealthInfo,
                                        observedClusterHealthInfo);

                lastValidClusterHealthInfo.setTimeStamp(observedClusterHealthInfo.getTimeStamp());
                lastValidClusterHealthInfo.setHealthy(isHealthy);
                setLastValidClusterHealthInfo(clusterInfo, lastValidClusterHealthInfo);
            }
        }
    }