public DeleteControl cleanup()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java [98:137]


    public DeleteControl cleanup(FlinkStateSnapshotContext ctx) throws Exception {
        var resource = ctx.getResource();
        var state = resource.getStatus().getState();
        var resourceName = resource.getMetadata().getName();
        LOG.info("Cleaning up resource {}...", resourceName);

        if (resource.getSpec().isCheckpoint()) {
            return DeleteControl.defaultDelete();
        }
        if (!resource.getSpec().getSavepoint().getDisposeOnDelete()) {
            return DeleteControl.defaultDelete();
        }
        if (resource.getSpec().getJobReference() == null
                || resource.getSpec().getJobReference().getName() == null) {
            return DeleteControl.defaultDelete();
        }

        switch (state) {
            case IN_PROGRESS:
                LOG.info(
                        "Cannot delete resource {} yet as savepoint is still in progress...",
                        resourceName);
                return DeleteControl.noFinalizerRemoval()
                        .rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
            case COMPLETED:
                var flinkDeployment = getFlinkDeployment(ctx);
                return handleSnapshotCleanup(resource, flinkDeployment, ctx);
            case FAILED:
            case TRIGGER_PENDING:
            case ABANDONED:
                LOG.info(
                        "Savepoint state is {}, cleaning up resource {} without disposal...",
                        state.name(),
                        resourceName);
                return DeleteControl.defaultDelete();
            default:
                LOG.info("Unknown savepoint state for {}: {}", resourceName, state);
                return DeleteControl.defaultDelete();
        }
    }