protected void cancelJob()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [267:374]


    protected void cancelJob(
            FlinkDeployment deployment,
            UpgradeMode upgradeMode,
            Configuration conf,
            boolean deleteClusterAfterSavepoint)
            throws Exception {
        var deploymentStatus = deployment.getStatus();
        var jobIdString = deploymentStatus.getJobStatus().getJobId();
        var jobId = jobIdString != null ? JobID.fromHexString(jobIdString) : null;

        Optional<String> savepointOpt = Optional.empty();
        var savepointFormatType = SnapshotUtils.getSavepointFormatType(conf);
        try (var clusterClient = getClusterClient(conf)) {
            var clusterId = clusterClient.getClusterId();
            switch (upgradeMode) {
                case STATELESS:
                    if (ReconciliationUtils.isJobRunning(deployment.getStatus())) {
                        LOG.info("Job is running, cancelling job.");
                        try {
                            clusterClient
                                    .cancel(Preconditions.checkNotNull(jobId))
                                    .get(
                                            operatorConfig.getFlinkCancelJobTimeout().toSeconds(),
                                            TimeUnit.SECONDS);
                            LOG.info("Job successfully cancelled.");
                        } catch (Exception e) {
                            LOG.error("Could not shut down cluster gracefully, deleting...", e);
                        }
                    }
                    deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, conf, true);
                    break;
                case SAVEPOINT:
                    final String savepointDirectory =
                            Preconditions.checkNotNull(
                                    conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
                    final long timeout =
                            conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
                                    .getSeconds();
                    if (ReconciliationUtils.isJobRunning(deploymentStatus)) {
                        try {
                            LOG.info("Suspending job with savepoint.");
                            String savepoint =
                                    clusterClient
                                            .stopWithSavepoint(
                                                    Preconditions.checkNotNull(jobId),
                                                    false,
                                                    savepointDirectory,
                                                    conf.get(FLINK_VERSION)
                                                                    .isNewerVersionThan(
                                                                            FlinkVersion.v1_14)
                                                            ? savepointFormatType
                                                            : null)
                                            .get(timeout, TimeUnit.SECONDS);
                            savepointOpt = Optional.of(savepoint);
                            LOG.info("Job successfully suspended with savepoint {}.", savepoint);
                        } catch (TimeoutException exception) {
                            throw new FlinkException(
                                    String.format(
                                            "Timed out stopping the job %s in Flink cluster %s with savepoint, "
                                                    + "please configure a larger timeout via '%s'",
                                            jobId,
                                            clusterId,
                                            ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT
                                                    .key()),
                                    exception);
                        }
                    } else if (ReconciliationUtils.isJobInTerminalState(deploymentStatus)) {
                        LOG.info(
                                "Job is already in terminal state skipping cancel-with-savepoint operation.");
                    } else {
                        throw new RuntimeException(
                                "Unexpected non-terminal status: " + deploymentStatus);
                    }
                    if (deleteClusterAfterSavepoint) {
                        LOG.info("Cleaning up deployment after stop-with-savepoint");

                        deleteClusterDeployment(
                                deployment.getMetadata(), deploymentStatus, conf, true);
                    }
                    break;
                case LAST_STATE:
                    deleteClusterDeployment(
                            deployment.getMetadata(), deploymentStatus, conf, false);
                    break;
                default:
                    throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
            }
        }
        deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name());
        savepointOpt.ifPresent(
                location -> {
                    Savepoint sp =
                            Savepoint.of(
                                    location,
                                    SnapshotTriggerType.UPGRADE,
                                    SavepointFormatType.valueOf(savepointFormatType.name()));
                    deploymentStatus.getJobStatus().getSavepointInfo().updateLastSavepoint(sp);
                });

        var shutdownDisabled =
                upgradeMode != UpgradeMode.LAST_STATE
                        && FlinkUtils.clusterShutdownDisabled(
                                ReconciliationUtils.getDeployedSpec(deployment));
        if (!shutdownDisabled) {
            waitForClusterShutdown(conf);
            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        }
    }