public DeleteControl cleanupInternal()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java [97:139]


    public DeleteControl cleanupInternal(FlinkResourceContext<FlinkSessionJob> ctx) {
        Optional<FlinkDeployment> flinkDepOptional =
                ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class);

        if (flinkDepOptional.isPresent()) {
            String jobID = ctx.getResource().getStatus().getJobStatus().getJobId();
            if (jobID != null) {
                try {
                    UpgradeMode upgradeMode =
                            ctx.getOperatorConfig().isSavepointOnDeletion()
                                    ? UpgradeMode.SAVEPOINT
                                    : UpgradeMode.STATELESS;
                    cancelJob(ctx, upgradeMode);
                } catch (ExecutionException e) {
                    final var cause = e.getCause();

                    if (cause instanceof FlinkJobNotFoundException) {
                        LOG.error("Job {} not found in the Flink cluster.", jobID, e);
                        return DeleteControl.defaultDelete();
                    }

                    if (cause instanceof FlinkJobTerminatedWithoutCancellationException) {
                        LOG.error("Job {} already terminated without cancellation.", jobID, e);
                        return DeleteControl.defaultDelete();
                    }

                    final long delay =
                            ctx.getOperatorConfig().getProgressCheckInterval().toMillis();
                    LOG.error(
                            "Failed to cancel job {}, will reschedule after {} milliseconds.",
                            jobID,
                            delay,
                            e);
                    return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
                } catch (Exception e) {
                    LOG.error("Failed to cancel job {}.", jobID, e);
                }
            }
        } else {
            LOG.info("Session cluster deployment not available");
        }
        return DeleteControl.defaultDelete();
    }