in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [415:465]
public String savepointJobOrError(
RestClusterClient<String> clusterClient, CommonStatus<?> status, Configuration conf) {
var jobID = JobID.fromHexString(status.getJobStatus().getJobId());
String savepointDirectory = conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY);
var savepointFormatType =
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
long timeout = conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT).getSeconds();
String savepointPath;
if (ReconciliationUtils.isJobRunning(status)) {
LOG.info("Suspending job with savepoint");
try {
savepointPath =
clusterClient
.stopWithSavepoint(
jobID,
conf.getBoolean(
KubernetesOperatorConfigOptions
.DRAIN_ON_SAVEPOINT_DELETION),
savepointDirectory,
savepointFormatType)
.get(timeout, TimeUnit.SECONDS);
} catch (TimeoutException exception) {
throw new UpgradeFailureException(
String.format(
"Timed out stopping the job %s with savepoint, "
+ "please configure a larger timeout via '%s'",
jobID, ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()),
EventRecorder.Reason.SavepointError.name(),
exception);
} catch (Exception e) {
var stopWithSavepointException =
ExceptionUtils.findThrowableSerializedAware(
e,
StopWithSavepointStoppingException.class,
getClass().getClassLoader());
if (stopWithSavepointException.isPresent()) {
// Handle edge case where the savepoint completes but the job fails
// right afterward.
savepointPath = stopWithSavepointException.get().getSavepointPath();
} else {
// Rethrow if savepoint was not completed successfully.
throw new UpgradeFailureException(
"Savepoint Error", EventRecorder.Reason.SavepointError.name(), e);
}
}
} else {
throw new RuntimeException("Unexpected job status: " + status);
}
LOG.info("Job successfully suspended with savepoint {}", savepointPath);
return savepointPath;
}