public DeleteControl cleanupInternal()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java [119:168]


    public DeleteControl cleanupInternal(FlinkResourceContext<FlinkSessionJob> ctx) {
        var status = ctx.getResource().getStatus();
        long delay = ctx.getOperatorConfig().getProgressCheckInterval().toMillis();
        if (status.getReconciliationStatus().isBeforeFirstDeployment()
                || ReconciliationUtils.isJobInTerminalState(status)
                || status.getReconciliationStatus()
                                .deserializeLastReconciledSpec()
                                .getJob()
                                .getState()
                        == JobState.SUSPENDED
                || JobStatusObserver.JOB_NOT_FOUND_ERR.equals(status.getError())) {
            // Job is not running, nothing to do...
            return DeleteControl.defaultDelete();
        }

        if (ReconciliationUtils.isJobCancelling(status)) {
            LOG.info("Waiting for pending cancellation");
            return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
        }

        Optional<FlinkDeployment> flinkDepOptional =
                ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class);

        if (flinkDepOptional.isPresent()) {
            String jobID = ctx.getResource().getStatus().getJobStatus().getJobId();
            if (jobID != null) {
                try {
                    var observeConfig = ctx.getObserveConfig();
                    var suspendMode =
                            observeConfig.getBoolean(
                                            KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION)
                                    ? SuspendMode.SAVEPOINT
                                    : SuspendMode.STATELESS;
                    if (cancelJob(ctx, suspendMode)) {
                        LOG.info("Waiting for pending cancellation");
                        return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
                    }
                } catch (Exception e) {
                    LOG.error(
                            "Failed to cancel job, will reschedule after {} milliseconds.",
                            delay,
                            e);
                    return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
                }
            }
        } else {
            LOG.info("Session cluster deployment not available");
        }
        return DeleteControl.defaultDelete();
    }