public String savepointJobOrError()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [415:465]


    public String savepointJobOrError(
            RestClusterClient<String> clusterClient, CommonStatus<?> status, Configuration conf) {
        var jobID = JobID.fromHexString(status.getJobStatus().getJobId());
        String savepointDirectory = conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY);
        var savepointFormatType =
                conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
        long timeout = conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT).getSeconds();
        String savepointPath;
        if (ReconciliationUtils.isJobRunning(status)) {
            LOG.info("Suspending job with savepoint");
            try {
                savepointPath =
                        clusterClient
                                .stopWithSavepoint(
                                        jobID,
                                        conf.getBoolean(
                                                KubernetesOperatorConfigOptions
                                                        .DRAIN_ON_SAVEPOINT_DELETION),
                                        savepointDirectory,
                                        savepointFormatType)
                                .get(timeout, TimeUnit.SECONDS);
            } catch (TimeoutException exception) {
                throw new UpgradeFailureException(
                        String.format(
                                "Timed out stopping the job %s with savepoint, "
                                        + "please configure a larger timeout via '%s'",
                                jobID, ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()),
                        EventRecorder.Reason.SavepointError.name(),
                        exception);
            } catch (Exception e) {
                var stopWithSavepointException =
                        ExceptionUtils.findThrowableSerializedAware(
                                e,
                                StopWithSavepointStoppingException.class,
                                getClass().getClassLoader());
                if (stopWithSavepointException.isPresent()) {
                    // Handle edge case where the savepoint completes but the job fails
                    // right afterward.
                    savepointPath = stopWithSavepointException.get().getSavepointPath();
                } else {
                    // Rethrow if savepoint was not completed successfully.
                    throw new UpgradeFailureException(
                            "Savepoint Error", EventRecorder.Reason.SavepointError.name(), e);
                }
            }
        } else {
            throw new RuntimeException("Unexpected job status: " + status);
        }
        LOG.info("Job successfully suspended with savepoint {}", savepointPath);
        return savepointPath;
    }