in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java [119:168]
public DeleteControl cleanupInternal(FlinkResourceContext<FlinkSessionJob> ctx) {
var status = ctx.getResource().getStatus();
long delay = ctx.getOperatorConfig().getProgressCheckInterval().toMillis();
if (status.getReconciliationStatus().isBeforeFirstDeployment()
|| ReconciliationUtils.isJobInTerminalState(status)
|| status.getReconciliationStatus()
.deserializeLastReconciledSpec()
.getJob()
.getState()
== JobState.SUSPENDED
|| JobStatusObserver.JOB_NOT_FOUND_ERR.equals(status.getError())) {
// Job is not running, nothing to do...
return DeleteControl.defaultDelete();
}
if (ReconciliationUtils.isJobCancelling(status)) {
LOG.info("Waiting for pending cancellation");
return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
}
Optional<FlinkDeployment> flinkDepOptional =
ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class);
if (flinkDepOptional.isPresent()) {
String jobID = ctx.getResource().getStatus().getJobStatus().getJobId();
if (jobID != null) {
try {
var observeConfig = ctx.getObserveConfig();
var suspendMode =
observeConfig.getBoolean(
KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION)
? SuspendMode.SAVEPOINT
: SuspendMode.STATELESS;
if (cancelJob(ctx, suspendMode)) {
LOG.info("Waiting for pending cancellation");
return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
}
} catch (Exception e) {
LOG.error(
"Failed to cancel job, will reschedule after {} milliseconds.",
delay,
e);
return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
}
}
} else {
LOG.info("Session cluster deployment not available");
}
return DeleteControl.defaultDelete();
}