protected AvailableUpgradeMode getAvailableUpgradeMode()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java [75:128]


    protected AvailableUpgradeMode getAvailableUpgradeMode(
            FlinkResourceContext<FlinkDeployment> ctx, Configuration deployConfig)
            throws Exception {

        var deployment = ctx.getResource();
        var status = deployment.getStatus();
        var availableUpgradeMode = super.getAvailableUpgradeMode(ctx, deployConfig);

        if (availableUpgradeMode.isAvailable() || !availableUpgradeMode.isAllowFallback()) {
            return availableUpgradeMode;
        }
        var flinkService = ctx.getFlinkService();

        if (deployConfig.getBoolean(
                        KubernetesOperatorConfigOptions
                                .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
                && HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
                && HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())
                && !flinkVersionChanged(
                        ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {

            if (flinkService.isHaMetadataAvailable(deployConfig)) {
                LOG.info(
                        "Job is not running but HA metadata is available for last state restore, ready for upgrade");
                return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
            }
        }

        var jmDeployStatus = status.getJobManagerDeploymentStatus();
        if (jmDeployStatus != JobManagerDeploymentStatus.MISSING
                && status.getReconciliationStatus()
                                .deserializeLastReconciledSpec()
                                .getJob()
                                .getUpgradeMode()
                        != UpgradeMode.LAST_STATE
                && FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
            deleteJmThatNeverStarted(flinkService, deployment, deployConfig);
            return getAvailableUpgradeMode(ctx, deployConfig);
        }

        if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING
                        || jmDeployStatus == JobManagerDeploymentStatus.ERROR)
                && !flinkService.isHaMetadataAvailable(deployConfig)) {
            throw new RecoveryFailureException(
                    "JobManager deployment is missing and HA data is not available to make stateful upgrades. "
                            + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
                            + "Manual restore required.",
                    "UpgradeFailed");
        }

        LOG.info(
                "Job is not running and HA metadata is not available or usable for executing the upgrade, waiting for upgradeable state");
        return AvailableUpgradeMode.unavailable();
    }