in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [267:374]
protected void cancelJob(
FlinkDeployment deployment,
UpgradeMode upgradeMode,
Configuration conf,
boolean deleteClusterAfterSavepoint)
throws Exception {
var deploymentStatus = deployment.getStatus();
var jobIdString = deploymentStatus.getJobStatus().getJobId();
var jobId = jobIdString != null ? JobID.fromHexString(jobIdString) : null;
Optional<String> savepointOpt = Optional.empty();
var savepointFormatType = SnapshotUtils.getSavepointFormatType(conf);
try (var clusterClient = getClusterClient(conf)) {
var clusterId = clusterClient.getClusterId();
switch (upgradeMode) {
case STATELESS:
if (ReconciliationUtils.isJobRunning(deployment.getStatus())) {
LOG.info("Job is running, cancelling job.");
try {
clusterClient
.cancel(Preconditions.checkNotNull(jobId))
.get(
operatorConfig.getFlinkCancelJobTimeout().toSeconds(),
TimeUnit.SECONDS);
LOG.info("Job successfully cancelled.");
} catch (Exception e) {
LOG.error("Could not shut down cluster gracefully, deleting...", e);
}
}
deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, conf, true);
break;
case SAVEPOINT:
final String savepointDirectory =
Preconditions.checkNotNull(
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
final long timeout =
conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
.getSeconds();
if (ReconciliationUtils.isJobRunning(deploymentStatus)) {
try {
LOG.info("Suspending job with savepoint.");
String savepoint =
clusterClient
.stopWithSavepoint(
Preconditions.checkNotNull(jobId),
false,
savepointDirectory,
conf.get(FLINK_VERSION)
.isNewerVersionThan(
FlinkVersion.v1_14)
? savepointFormatType
: null)
.get(timeout, TimeUnit.SECONDS);
savepointOpt = Optional.of(savepoint);
LOG.info("Job successfully suspended with savepoint {}.", savepoint);
} catch (TimeoutException exception) {
throw new FlinkException(
String.format(
"Timed out stopping the job %s in Flink cluster %s with savepoint, "
+ "please configure a larger timeout via '%s'",
jobId,
clusterId,
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT
.key()),
exception);
}
} else if (ReconciliationUtils.isJobInTerminalState(deploymentStatus)) {
LOG.info(
"Job is already in terminal state skipping cancel-with-savepoint operation.");
} else {
throw new RuntimeException(
"Unexpected non-terminal status: " + deploymentStatus);
}
if (deleteClusterAfterSavepoint) {
LOG.info("Cleaning up deployment after stop-with-savepoint");
deleteClusterDeployment(
deployment.getMetadata(), deploymentStatus, conf, true);
}
break;
case LAST_STATE:
deleteClusterDeployment(
deployment.getMetadata(), deploymentStatus, conf, false);
break;
default:
throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
}
}
deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name());
savepointOpt.ifPresent(
location -> {
Savepoint sp =
Savepoint.of(
location,
SnapshotTriggerType.UPGRADE,
SavepointFormatType.valueOf(savepointFormatType.name()));
deploymentStatus.getJobStatus().getSavepointInfo().updateLastSavepoint(sp);
});
var shutdownDisabled =
upgradeMode != UpgradeMode.LAST_STATE
&& FlinkUtils.clusterShutdownDisabled(
ReconciliationUtils.getDeployedSpec(deployment));
if (!shutdownDisabled) {
waitForClusterShutdown(conf);
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
}
}