in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java [98:137]
public DeleteControl cleanup(FlinkStateSnapshotContext ctx) throws Exception {
var resource = ctx.getResource();
var state = resource.getStatus().getState();
var resourceName = resource.getMetadata().getName();
LOG.info("Cleaning up resource {}...", resourceName);
if (resource.getSpec().isCheckpoint()) {
return DeleteControl.defaultDelete();
}
if (!resource.getSpec().getSavepoint().getDisposeOnDelete()) {
return DeleteControl.defaultDelete();
}
if (resource.getSpec().getJobReference() == null
|| resource.getSpec().getJobReference().getName() == null) {
return DeleteControl.defaultDelete();
}
switch (state) {
case IN_PROGRESS:
LOG.info(
"Cannot delete resource {} yet as savepoint is still in progress...",
resourceName);
return DeleteControl.noFinalizerRemoval()
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
case COMPLETED:
var flinkDeployment = getFlinkDeployment(ctx);
return handleSnapshotCleanup(resource, flinkDeployment, ctx);
case FAILED:
case TRIGGER_PENDING:
case ABANDONED:
LOG.info(
"Savepoint state is {}, cleaning up resource {} without disposal...",
state.name(),
resourceName);
return DeleteControl.defaultDelete();
default:
LOG.info("Unknown savepoint state for {}: {}", resourceName, state);
return DeleteControl.defaultDelete();
}
}