protected boolean reconcileSpecChange()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java [111:210]


    protected boolean reconcileSpecChange(
            DiffType diffType,
            FlinkResourceContext<CR> ctx,
            Configuration deployConfig,
            SPEC lastReconciledSpec)
            throws Exception {

        var resource = ctx.getResource();
        STATUS status = resource.getStatus();
        SPEC currentDeploySpec = resource.getSpec();

        JobState currentJobState = lastReconciledSpec.getJob().getState();
        JobState desiredJobState = currentDeploySpec.getJob().getState();

        if (diffType == DiffType.SAVEPOINT_REDEPLOY) {
            redeployWithSavepoint(
                    ctx, deployConfig, resource, status, currentDeploySpec, desiredJobState);
            return true;
        }

        if (currentJobState == JobState.RUNNING) {
            var jobUpgrade = getJobUpgrade(ctx, deployConfig);
            if (!jobUpgrade.isAvailable()) {
                // If job upgrade is currently not available for some reason we must still check if
                // other reconciliation action may be taken while we wait...
                LOG.info(
                        "Job is not running and checkpoint information is not available for executing the upgrade, waiting for upgradeable state");
                return !jobUpgrade.allowOtherReconcileActions;
            }
            LOG.debug("Job upgrade available: {}", jobUpgrade);

            var suspendMode = jobUpgrade.getSuspendMode();
            if (suspendMode != SuspendMode.NOOP) {
                eventRecorder.triggerEvent(
                        resource,
                        EventRecorder.Type.Normal,
                        EventRecorder.Reason.Suspended,
                        EventRecorder.Component.JobManagerDeployment,
                        MSG_SUSPENDED,
                        ctx.getKubernetesClient());
            }

            boolean async = cancelJob(ctx, suspendMode);
            if (async) {
                // Async cancellation will be completed in the background, so we must exit
                // reconciliation early and wait until it completes to finish the upgrade.
                resource.getStatus()
                        .getReconciliationStatus()
                        .setState(ReconciliationState.UPGRADING);
                ReconciliationUtils.updateLastReconciledSpec(
                        resource,
                        (s, m) -> {
                            s.getJob().setUpgradeMode(jobUpgrade.getRestoreMode());
                            m.setFirstDeployment(false);
                        });
                return true;
            }

            // We must record the upgrade mode used to the status later
            currentDeploySpec.getJob().setUpgradeMode(jobUpgrade.getRestoreMode());

            if (desiredJobState == JobState.RUNNING) {
                ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
                        resource, deployConfig, clock);
            } else {
                ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, clock);
            }

            if (suspendMode == SuspendMode.NOOP) {
                // If already cancelled we want to restore immediately so we modify the current
                // state
                // We don't do this when we actually performed a potentially lengthy cancel action
                // to allow reconciling the spec
                lastReconciledSpec.getJob().setUpgradeMode(jobUpgrade.getRestoreMode());
                currentJobState = JobState.SUSPENDED;
            }
        }

        if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
            // We inherit the upgrade mode unless stateless upgrade requested
            if (currentDeploySpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
                currentDeploySpec
                        .getJob()
                        .setUpgradeMode(lastReconciledSpec.getJob().getUpgradeMode());
            }
            // We record the target spec into an upgrading state before deploying
            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig, clock);
            statusRecorder.patchAndCacheStatus(resource, ctx.getKubernetesClient());

            restoreJob(
                    ctx,
                    currentDeploySpec,
                    deployConfig,
                    // We decide to enforce HA based on how job was previously suspended
                    lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);

            ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, clock);
        }
        return true;
    }