Set getFlinkStateSnapshotsToCleanUp()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java [273:323]


    Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp(
            Collection<FlinkStateSnapshot> snapshots,
            Configuration observeConfig,
            FlinkOperatorConfiguration operatorConfig,
            SnapshotType snapshotType) {
        var snapshotList =
                snapshots.stream()
                        .filter(s -> !s.isMarkedForDeletion())
                        .filter(
                                s ->
                                        CLEAN_UP_SNAPSHOT_TRIGGER_TYPES.contains(
                                                FlinkStateSnapshotUtils.getSnapshotTriggerType(s)))
                        .filter(s -> (s.getSpec().isSavepoint() == (snapshotType == SAVEPOINT)))
                        .sorted(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
                        .collect(Collectors.toList());

        var lastCompleteSnapshot =
                snapshotList.stream()
                        .filter(
                                s ->
                                        s.getStatus() != null
                                                && COMPLETED.equals(s.getStatus().getState()))
                        .max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
                        .orElse(null);

        var maxCount = getMaxCountForSnapshotType(observeConfig, operatorConfig, snapshotType);
        var maxTms = getMinAgeForSnapshotType(observeConfig, operatorConfig, snapshotType);
        var result = new HashSet<FlinkStateSnapshot>();

        if (snapshotList.size() < 2) {
            return result;
        }

        for (var snapshot : snapshotList) {
            if (snapshot.equals(lastCompleteSnapshot)) {
                continue;
            }

            // We should keep the last snapshot, even if not complete.
            if (result.size() == snapshotList.size() - 1) {
                break;
            }

            var ts = EXTRACT_SNAPSHOT_TIME.apply(snapshot).toEpochMilli();
            if (snapshotList.size() - result.size() > maxCount || ts < maxTms) {
                result.add(snapshot);
            }
        }

        return result;
    }