in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java [97:139]
public DeleteControl cleanupInternal(FlinkResourceContext<FlinkSessionJob> ctx) {
Optional<FlinkDeployment> flinkDepOptional =
ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class);
if (flinkDepOptional.isPresent()) {
String jobID = ctx.getResource().getStatus().getJobStatus().getJobId();
if (jobID != null) {
try {
UpgradeMode upgradeMode =
ctx.getOperatorConfig().isSavepointOnDeletion()
? UpgradeMode.SAVEPOINT
: UpgradeMode.STATELESS;
cancelJob(ctx, upgradeMode);
} catch (ExecutionException e) {
final var cause = e.getCause();
if (cause instanceof FlinkJobNotFoundException) {
LOG.error("Job {} not found in the Flink cluster.", jobID, e);
return DeleteControl.defaultDelete();
}
if (cause instanceof FlinkJobTerminatedWithoutCancellationException) {
LOG.error("Job {} already terminated without cancellation.", jobID, e);
return DeleteControl.defaultDelete();
}
final long delay =
ctx.getOperatorConfig().getProgressCheckInterval().toMillis();
LOG.error(
"Failed to cancel job {}, will reschedule after {} milliseconds.",
jobID,
delay,
e);
return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
} catch (Exception e) {
LOG.error("Failed to cancel job {}.", jobID, e);
}
}
} else {
LOG.info("Session cluster deployment not available");
}
return DeleteControl.defaultDelete();
}