protected JobUpgrade getJobUpgrade()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java [212:301]


    protected JobUpgrade getJobUpgrade(FlinkResourceContext<CR> ctx, Configuration deployConfig)
            throws Exception {
        var resource = ctx.getResource();
        var status = resource.getStatus();
        var upgradeMode = resource.getSpec().getJob().getUpgradeMode();
        boolean terminal = ReconciliationUtils.isJobInTerminalState(status);

        if (upgradeMode == UpgradeMode.STATELESS) {
            LOG.info("Stateless job, ready for upgrade");
            return JobUpgrade.stateless(terminal);
        }

        var flinkService = ctx.getFlinkService();
        if (ReconciliationUtils.isJobCancelled(status)
                || (terminal && !flinkService.isHaMetadataAvailable(ctx.getObserveConfig()))) {

            if (!SnapshotUtils.lastSavepointKnown(status)) {
                throw new UpgradeFailureException(
                        "Job is in terminal state but last checkpoint is unknown, possibly due to an unrecoverable restore error. Manual restore required.",
                        "UpgradeFailed");
            }
            LOG.info("Job is in terminal state, ready for upgrade from observed latest state");
            return JobUpgrade.savepoint(true);
        }

        if (ReconciliationUtils.isJobCancelling(status)) {
            LOG.info("Cancellation is in progress. Waiting for cancelled state.");
            return JobUpgrade.pendingCancellation();
        }

        boolean running = ReconciliationUtils.isJobRunning(status);
        boolean versionChanged =
                flinkVersionChanged(
                        ReconciliationUtils.getDeployedSpec(resource), resource.getSpec());

        if (upgradeMode == UpgradeMode.SAVEPOINT) {
            if (running) {
                LOG.info("Job is in running state, ready for upgrade with savepoint");
                return JobUpgrade.savepoint(false);
            } else if (versionChanged
                    || deployConfig.get(
                            KubernetesOperatorConfigOptions
                                    .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)) {
                LOG.info("Falling back to last-state upgrade mode from savepoint");
                ctx.getResource()
                        .getSpec()
                        .getJob()
                        .setUpgradeMode(upgradeMode = UpgradeMode.LAST_STATE);
            } else {
                LOG.info("Last-state fallback is disabled, waiting for upgradable state");
                return JobUpgrade.pendingUpgrade();
            }
        }

        if (upgradeMode == UpgradeMode.LAST_STATE) {
            if (versionChanged) {
                // We need some special handling in case of version upgrades where HA based
                // last-state upgrade is not possible
                boolean savepointPossible =
                        !StringUtils.isNullOrWhitespaceOnly(
                                ctx.getObserveConfig()
                                        .getString(CheckpointingOptions.SAVEPOINT_DIRECTORY));
                if (running && savepointPossible) {
                    LOG.info("Using savepoint to upgrade Flink version");
                    return JobUpgrade.savepoint(false);
                } else if (ReconciliationUtils.isJobCancellable(resource.getStatus())) {
                    LOG.info("Using last-state upgrade with cancellation to upgrade Flink version");
                    return JobUpgrade.lastStateUsingCancel();
                } else {
                    LOG.info(
                            "Neither savepoint nor cancellation is possible, cannot perform stateful version upgrade");
                    return JobUpgrade.pendingUpgrade();
                }
            }

            boolean cancellable = allowLastStateCancel(ctx);
            if (running) {
                var mode = getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
                LOG.info("Job is running, using {} for last-state upgrade", mode);
                return mode;
            }

            if (cancellable) {
                LOG.info("Job is not running, using cancel to perform last-state upgrade");
                return JobUpgrade.lastStateUsingCancel();
            }
        }

        return JobUpgrade.unavailable();
    }