in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [377:463]
public void cancelSessionJob(
FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf)
throws Exception {
var sessionJobStatus = sessionJob.getStatus();
var jobStatus = sessionJobStatus.getJobStatus();
var jobIdString = jobStatus.getJobId();
Preconditions.checkNotNull(jobIdString, "The job to be suspend should not be null");
var jobId = JobID.fromHexString(jobIdString);
Optional<String> savepointOpt = Optional.empty();
LOG.debug("Current job state: {}", jobStatus.getState());
if (!ReconciliationUtils.isJobInTerminalState(sessionJobStatus)) {
LOG.debug("Job is not in terminal state, cancelling it");
try (var clusterClient = getClusterClient(conf)) {
final String clusterId = clusterClient.getClusterId();
switch (upgradeMode) {
case STATELESS:
LOG.info("Cancelling job.");
clusterClient
.cancel(jobId)
.get(
operatorConfig.getFlinkCancelJobTimeout().toSeconds(),
TimeUnit.SECONDS);
LOG.info("Job successfully cancelled.");
break;
case SAVEPOINT:
if (ReconciliationUtils.isJobRunning(sessionJobStatus)) {
LOG.info("Suspending job with savepoint.");
final String savepointDirectory =
Preconditions.checkNotNull(
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
final long timeout =
conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
.getSeconds();
try {
String savepoint =
clusterClient
.stopWithSavepoint(
jobId,
false,
savepointDirectory,
conf.get(FLINK_VERSION)
.isNewerVersionThan(
FlinkVersion.v1_14)
? conf.get(
KubernetesOperatorConfigOptions
.OPERATOR_SAVEPOINT_FORMAT_TYPE)
: null)
.get(timeout, TimeUnit.SECONDS);
savepointOpt = Optional.of(savepoint);
LOG.info(
"Job successfully suspended with savepoint {}.", savepoint);
} catch (TimeoutException exception) {
throw new FlinkException(
String.format(
"Timed out stopping the job %s in Flink cluster %s with savepoint, "
+ "please configure a larger timeout via '%s'",
jobId,
clusterId,
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT
.key()),
exception);
}
} else {
throw new RuntimeException(
"Unexpected non-terminal status: " + jobStatus.getState());
}
break;
case LAST_STATE:
default:
throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
}
}
} else {
LOG.debug("Job is in terminal state, skipping cancel");
}
jobStatus.setState(JobStatus.FINISHED.name());
savepointOpt.ifPresent(
location -> {
Savepoint sp = Savepoint.of(location, SnapshotTriggerType.UPGRADE);
jobStatus.getSavepointInfo().updateLastSavepoint(sp);
});
}