void cleanupSavepointHistoryLegacy()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java [334:391]


    void cleanupSavepointHistoryLegacy(
            FlinkResourceContext<CR> ctx, Set<FlinkStateSnapshot> allSecondarySnapshotResources) {
        var maxTms =
                getMinAgeForSnapshotType(
                        ctx.getObserveConfig(), ctx.getOperatorConfig(), SAVEPOINT);
        var maxCount =
                getMaxCountForSnapshotType(
                        ctx.getObserveConfig(), ctx.getOperatorConfig(), SAVEPOINT);

        var completedSavepointCrs =
                allSecondarySnapshotResources.stream()
                        .filter(
                                s ->
                                        s.getStatus() != null
                                                && COMPLETED.equals(s.getStatus().getState()))
                        .filter(s -> s.getSpec().isSavepoint())
                        .count();
        maxCount = Math.max(0, maxCount - completedSavepointCrs);

        var savepointHistory =
                ctx.getResource()
                        .getStatus()
                        .getJobStatus()
                        .getSavepointInfo()
                        .getSavepointHistory();

        var savepointCleanupEnabled =
                ctx.getObserveConfig().get(OPERATOR_SAVEPOINT_CLEANUP_ENABLED);

        // If we have a single new FlinkStateSnapshot CR, we can clean up the last entry.
        if (savepointHistory.isEmpty()
                || (savepointHistory.size() == 1 && completedSavepointCrs == 0)) {
            return;
        }
        var lastSavepoint = savepointHistory.get(savepointHistory.size() - 1);

        while (savepointHistory.size() > maxCount) {
            // remove oldest entries
            var sp = savepointHistory.remove(0);
            if (savepointCleanupEnabled) {
                disposeSavepointQuietly(ctx, sp.getLocation());
            }
        }

        var it = savepointHistory.iterator();
        while (it.hasNext()) {
            var sp = it.next();
            if (sp == lastSavepoint && completedSavepointCrs == 0) {
                continue;
            }
            if (sp.getTimeStamp() < maxTms) {
                it.remove();
                if (savepointCleanupEnabled) {
                    disposeSavepointQuietly(ctx, sp.getLocation());
                }
            }
        }
    }