in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java [334:391]
void cleanupSavepointHistoryLegacy(
FlinkResourceContext<CR> ctx, Set<FlinkStateSnapshot> allSecondarySnapshotResources) {
var maxTms =
getMinAgeForSnapshotType(
ctx.getObserveConfig(), ctx.getOperatorConfig(), SAVEPOINT);
var maxCount =
getMaxCountForSnapshotType(
ctx.getObserveConfig(), ctx.getOperatorConfig(), SAVEPOINT);
var completedSavepointCrs =
allSecondarySnapshotResources.stream()
.filter(
s ->
s.getStatus() != null
&& COMPLETED.equals(s.getStatus().getState()))
.filter(s -> s.getSpec().isSavepoint())
.count();
maxCount = Math.max(0, maxCount - completedSavepointCrs);
var savepointHistory =
ctx.getResource()
.getStatus()
.getJobStatus()
.getSavepointInfo()
.getSavepointHistory();
var savepointCleanupEnabled =
ctx.getObserveConfig().get(OPERATOR_SAVEPOINT_CLEANUP_ENABLED);
// If we have a single new FlinkStateSnapshot CR, we can clean up the last entry.
if (savepointHistory.isEmpty()
|| (savepointHistory.size() == 1 && completedSavepointCrs == 0)) {
return;
}
var lastSavepoint = savepointHistory.get(savepointHistory.size() - 1);
while (savepointHistory.size() > maxCount) {
// remove oldest entries
var sp = savepointHistory.remove(0);
if (savepointCleanupEnabled) {
disposeSavepointQuietly(ctx, sp.getLocation());
}
}
var it = savepointHistory.iterator();
while (it.hasNext()) {
var sp = it.next();
if (sp == lastSavepoint && completedSavepointCrs == 0) {
continue;
}
if (sp.getTimeStamp() < maxTms) {
it.remove();
if (savepointCleanupEnabled) {
disposeSavepointQuietly(ctx, sp.getLocation());
}
}
}
}