void cleanupSavepointHistory()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java [214:261]


    void cleanupSavepointHistory(FlinkResourceContext<CR> ctx, SavepointInfo currentSavepointInfo) {

        var observeConfig = ctx.getObserveConfig();
        var flinkService = ctx.getFlinkService();
        boolean savepointCleanupEnabled =
                observeConfig.getBoolean(
                        KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED);

        // maintain history
        List<Savepoint> savepointHistory = currentSavepointInfo.getSavepointHistory();
        if (savepointHistory.size() < 2) {
            return;
        }
        var lastSavepoint = savepointHistory.get(savepointHistory.size() - 1);

        int maxCount =
                Math.max(
                        1,
                        ConfigOptionUtils.getValueWithThreshold(
                                observeConfig,
                                KubernetesOperatorConfigOptions
                                        .OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT,
                                ctx.getOperatorConfig().getSavepointHistoryCountThreshold()));
        while (savepointHistory.size() > maxCount) {
            // remove oldest entries
            Savepoint sp = savepointHistory.remove(0);
            if (savepointCleanupEnabled) {
                disposeSavepointQuietly(flinkService, sp, observeConfig);
            }
        }

        Duration maxAge =
                ConfigOptionUtils.getValueWithThreshold(
                        observeConfig,
                        KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
                        ctx.getOperatorConfig().getSavepointHistoryAgeThreshold());
        long maxTms = System.currentTimeMillis() - maxAge.toMillis();
        Iterator<Savepoint> it = savepointHistory.iterator();
        while (it.hasNext()) {
            Savepoint sp = it.next();
            if (sp.getTimeStamp() < maxTms && sp != lastSavepoint) {
                it.remove();
                if (savepointCleanupEnabled) {
                    disposeSavepointQuietly(flinkService, sp, observeConfig);
                }
            }
        }
    }