in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java [273:323]
Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp(
Collection<FlinkStateSnapshot> snapshots,
Configuration observeConfig,
FlinkOperatorConfiguration operatorConfig,
SnapshotType snapshotType) {
var snapshotList =
snapshots.stream()
.filter(s -> !s.isMarkedForDeletion())
.filter(
s ->
CLEAN_UP_SNAPSHOT_TRIGGER_TYPES.contains(
FlinkStateSnapshotUtils.getSnapshotTriggerType(s)))
.filter(s -> (s.getSpec().isSavepoint() == (snapshotType == SAVEPOINT)))
.sorted(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
.collect(Collectors.toList());
var lastCompleteSnapshot =
snapshotList.stream()
.filter(
s ->
s.getStatus() != null
&& COMPLETED.equals(s.getStatus().getState()))
.max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
.orElse(null);
var maxCount = getMaxCountForSnapshotType(observeConfig, operatorConfig, snapshotType);
var maxTms = getMinAgeForSnapshotType(observeConfig, operatorConfig, snapshotType);
var result = new HashSet<FlinkStateSnapshot>();
if (snapshotList.size() < 2) {
return result;
}
for (var snapshot : snapshotList) {
if (snapshot.equals(lastCompleteSnapshot)) {
continue;
}
// We should keep the last snapshot, even if not complete.
if (result.size() == snapshotList.size() - 1) {
break;
}
var ts = EXTRACT_SNAPSHOT_TIME.apply(snapshot).toEpochMilli();
if (snapshotList.size() - result.size() > maxCount || ts < maxTms) {
result.add(snapshot);
}
}
return result;
}