private boolean evaluateCheckpoints()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java [173:255]


    private boolean evaluateCheckpoints(
            Configuration configuration,
            ClusterHealthInfo lastValidClusterHealthInfo,
            ClusterHealthInfo observedClusterHealthInfo) {
        if (!configuration.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED)) {
            return true;
        }

        var windowOpt =
                configuration.getOptional(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);

        CheckpointConfig checkpointConfig = new CheckpointConfig();
        checkpointConfig.configure(configuration);
        var checkpointingInterval = checkpointConfig.getCheckpointInterval();
        var checkpointingTimeout = checkpointConfig.getCheckpointTimeout();
        var tolerationFailureNumber = checkpointConfig.getTolerableCheckpointFailureNumber() + 2;
        var minCheckWindow =
                Duration.ofMillis(
                        Math.max(
                                checkpointingInterval * tolerationFailureNumber,
                                checkpointingTimeout * tolerationFailureNumber));

        if (windowOpt.isEmpty() && !checkpointConfig.isCheckpointingEnabled()) {
            // If no explicit checkpoint check window is specified and checkpointing is disabled
            // based on the config, we don't do anything
            return true;
        }

        var completedCheckpointsCheckWindow =
                windowOpt
                        .filter(
                                d -> {
                                    if (d.compareTo(minCheckWindow) < 0) {
                                        LOG.debug(
                                                "{} is not long enough. Default to max({} * {}, {} * {}): {}",
                                                OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW
                                                        .key(),
                                                CHECKPOINTING_INTERVAL.key(),
                                                TOLERABLE_FAILURE_NUMBER.key(),
                                                CHECKPOINTING_TIMEOUT.key(),
                                                TOLERABLE_FAILURE_NUMBER.key(),
                                                minCheckWindow);
                                        return false;
                                    }
                                    return true;
                                })
                        .orElse(minCheckWindow);

        if (observedClusterHealthInfo.getNumCompletedCheckpoints()
                < lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
            LOG.debug(
                    "Observed health info number of completed checkpoints is less than in the last valid health info, skipping health check");
            lastValidClusterHealthInfo.setNumCompletedCheckpoints(
                    observedClusterHealthInfo.getNumCompletedCheckpoints());
            lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
                    observedClusterHealthInfo.getTimeStamp());
            return true;
        }

        var timestampDiffMs =
                observedClusterHealthInfo.getTimeStamp()
                        - lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp();
        LOG.debug("Time difference between health infos: {}", Duration.ofMillis(timestampDiffMs));

        boolean isHealthy = true;
        var completedCheckpointsCheckWindowMs = completedCheckpointsCheckWindow.toMillis();

        if (observedClusterHealthInfo.getNumCompletedCheckpoints()
                > lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
            LOG.debug("Last valid number of completed checkpoints increased marking timestamp");
            lastValidClusterHealthInfo.setNumCompletedCheckpoints(
                    observedClusterHealthInfo.getNumCompletedCheckpoints());
            lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
                    observedClusterHealthInfo.getTimeStamp());
        } else if (lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp()
                        + completedCheckpointsCheckWindowMs
                < clock.millis()) {
            LOG.info("Cluster is not able to complete checkpoints");
            isHealthy = false;
        }

        return isHealthy;
    }