protected JobUpgrade getJobUpgrade()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java [79:123]


    protected JobUpgrade getJobUpgrade(
            FlinkResourceContext<FlinkDeployment> ctx, Configuration deployConfig)
            throws Exception {

        var deployment = ctx.getResource();
        var status = deployment.getStatus();
        var availableUpgradeMode = super.getJobUpgrade(ctx, deployConfig);

        if (availableUpgradeMode.isAvailable() || !availableUpgradeMode.isAllowFallback()) {
            return availableUpgradeMode;
        }
        var flinkService = ctx.getFlinkService();

        if (HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
                && HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())
                && flinkService.isHaMetadataAvailable(deployConfig)) {
            LOG.info(
                    "Job is not running but HA metadata is available for last state restore, ready for upgrade");
            return JobUpgrade.lastStateUsingHaMeta();
        }

        var jmDeployStatus = status.getJobManagerDeploymentStatus();
        if (jmDeployStatus != JobManagerDeploymentStatus.MISSING
                && status.getReconciliationStatus()
                                .deserializeLastReconciledSpec()
                                .getJob()
                                .getUpgradeMode()
                        != UpgradeMode.LAST_STATE
                && FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
            deleteJmThatNeverStarted(flinkService, deployment, deployConfig);
            return getJobUpgrade(ctx, deployConfig);
        }

        if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING
                        || jmDeployStatus == JobManagerDeploymentStatus.ERROR)
                && !flinkService.isHaMetadataAvailable(deployConfig)) {
            throw new UpgradeFailureException(
                    "JobManager deployment is missing and HA data is not available to make stateful upgrades. "
                            + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
                            + "Manual restore required.",
                    "UpgradeFailed");
        }

        return JobUpgrade.unavailable();
    }