in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java [214:261]
void cleanupSavepointHistory(FlinkResourceContext<CR> ctx, SavepointInfo currentSavepointInfo) {
var observeConfig = ctx.getObserveConfig();
var flinkService = ctx.getFlinkService();
boolean savepointCleanupEnabled =
observeConfig.getBoolean(
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED);
// maintain history
List<Savepoint> savepointHistory = currentSavepointInfo.getSavepointHistory();
if (savepointHistory.size() < 2) {
return;
}
var lastSavepoint = savepointHistory.get(savepointHistory.size() - 1);
int maxCount =
Math.max(
1,
ConfigOptionUtils.getValueWithThreshold(
observeConfig,
KubernetesOperatorConfigOptions
.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT,
ctx.getOperatorConfig().getSavepointHistoryCountThreshold()));
while (savepointHistory.size() > maxCount) {
// remove oldest entries
Savepoint sp = savepointHistory.remove(0);
if (savepointCleanupEnabled) {
disposeSavepointQuietly(flinkService, sp, observeConfig);
}
}
Duration maxAge =
ConfigOptionUtils.getValueWithThreshold(
observeConfig,
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
ctx.getOperatorConfig().getSavepointHistoryAgeThreshold());
long maxTms = System.currentTimeMillis() - maxAge.toMillis();
Iterator<Savepoint> it = savepointHistory.iterator();
while (it.hasNext()) {
Savepoint sp = it.next();
if (sp.getTimeStamp() < maxTms && sp != lastSavepoint) {
it.remove();
if (savepointCleanupEnabled) {
disposeSavepointQuietly(flinkService, sp, observeConfig);
}
}
}
}