public void cancelSessionJob()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [377:463]


    public void cancelSessionJob(
            FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf)
            throws Exception {

        var sessionJobStatus = sessionJob.getStatus();
        var jobStatus = sessionJobStatus.getJobStatus();
        var jobIdString = jobStatus.getJobId();
        Preconditions.checkNotNull(jobIdString, "The job to be suspend should not be null");
        var jobId = JobID.fromHexString(jobIdString);
        Optional<String> savepointOpt = Optional.empty();

        LOG.debug("Current job state: {}", jobStatus.getState());

        if (!ReconciliationUtils.isJobInTerminalState(sessionJobStatus)) {
            LOG.debug("Job is not in terminal state, cancelling it");

            try (var clusterClient = getClusterClient(conf)) {
                final String clusterId = clusterClient.getClusterId();
                switch (upgradeMode) {
                    case STATELESS:
                        LOG.info("Cancelling job.");
                        clusterClient
                                .cancel(jobId)
                                .get(
                                        operatorConfig.getFlinkCancelJobTimeout().toSeconds(),
                                        TimeUnit.SECONDS);
                        LOG.info("Job successfully cancelled.");
                        break;
                    case SAVEPOINT:
                        if (ReconciliationUtils.isJobRunning(sessionJobStatus)) {
                            LOG.info("Suspending job with savepoint.");
                            final String savepointDirectory =
                                    Preconditions.checkNotNull(
                                            conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
                            final long timeout =
                                    conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
                                            .getSeconds();
                            try {
                                String savepoint =
                                        clusterClient
                                                .stopWithSavepoint(
                                                        jobId,
                                                        false,
                                                        savepointDirectory,
                                                        conf.get(FLINK_VERSION)
                                                                        .isNewerVersionThan(
                                                                                FlinkVersion.v1_14)
                                                                ? conf.get(
                                                                        KubernetesOperatorConfigOptions
                                                                                .OPERATOR_SAVEPOINT_FORMAT_TYPE)
                                                                : null)
                                                .get(timeout, TimeUnit.SECONDS);
                                savepointOpt = Optional.of(savepoint);
                                LOG.info(
                                        "Job successfully suspended with savepoint {}.", savepoint);
                            } catch (TimeoutException exception) {
                                throw new FlinkException(
                                        String.format(
                                                "Timed out stopping the job %s in Flink cluster %s with savepoint, "
                                                        + "please configure a larger timeout via '%s'",
                                                jobId,
                                                clusterId,
                                                ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT
                                                        .key()),
                                        exception);
                            }
                        } else {
                            throw new RuntimeException(
                                    "Unexpected non-terminal status: " + jobStatus.getState());
                        }
                        break;
                    case LAST_STATE:
                    default:
                        throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
                }
            }
        } else {
            LOG.debug("Job is in terminal state, skipping cancel");
        }

        jobStatus.setState(JobStatus.FINISHED.name());
        savepointOpt.ifPresent(
                location -> {
                    Savepoint sp = Savepoint.of(location, SnapshotTriggerType.UPGRADE);
                    jobStatus.getSavepointInfo().updateLastSavepoint(sp);
                });
    }