private boolean evaluateCheckpoints()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java [169:213]


    private boolean evaluateCheckpoints(
            Configuration configuration,
            ClusterHealthInfo lastValidClusterHealthInfo,
            ClusterHealthInfo observedClusterHealthInfo) {
        if (!configuration.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED)) {
            return true;
        }

        if (observedClusterHealthInfo.getNumCompletedCheckpoints()
                < lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
            LOG.debug(
                    "Observed health info number of completed checkpoints is less than in the last valid health info, skipping health check");
            lastValidClusterHealthInfo.setNumCompletedCheckpoints(
                    observedClusterHealthInfo.getNumCompletedCheckpoints());
            lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
                    observedClusterHealthInfo.getTimeStamp());
            return true;
        }

        var timestampDiffMs =
                observedClusterHealthInfo.getTimeStamp()
                        - lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp();
        LOG.debug("Time difference between health infos: {}", Duration.ofMillis(timestampDiffMs));

        boolean isHealthy = true;
        var completedCheckpointsCheckWindow =
                configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
        var completedCheckpointsCheckWindowMs = completedCheckpointsCheckWindow.toMillis();

        if (observedClusterHealthInfo.getNumCompletedCheckpoints()
                > lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
            LOG.debug("Last valid number of completed checkpoints increased marking timestamp");
            lastValidClusterHealthInfo.setNumCompletedCheckpoints(
                    observedClusterHealthInfo.getNumCompletedCheckpoints());
            lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
                    observedClusterHealthInfo.getTimeStamp());
        } else if (lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp()
                        + completedCheckpointsCheckWindowMs
                < clock.millis()) {
            LOG.info("Cluster is not able to complete checkpoints");
            isHealthy = false;
        }

        return isHealthy;
    }