private boolean shouldRollBack()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java [383:442]


    private boolean shouldRollBack(
            FlinkResourceContext<CR> ctx, boolean specChanged, SPEC lastReconciledSpec) {

        var resource = ctx.getResource();
        var reconciliationStatus = resource.getStatus().getReconciliationStatus();
        var configuration = ctx.getObserveConfig();

        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
            return true;
        }

        if (specChanged) {
            return false;
        }

        if (!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)
                || reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK
                || reconciliationStatus.isLastReconciledSpecStable()) {
            return false;
        }

        var lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
        if (lastStableSpec == null) {
            // Nothing to roll back to yet
            return false;
        }

        if (lastStableSpec.getJob() != null
                && lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
            // Should not roll back to suspended state
            return false;
        }

        if (flinkVersionChanged(resource.getSpec(), lastStableSpec)) {
            // Should not roll back Flink version changes
            return false;
        }

        Duration readinessTimeout =
                configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
        if (!clock.instant()
                .minus(readinessTimeout)
                .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()))) {
            return false;
        }

        if (lastReconciledSpec.getJob() != null
                && lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT
                && FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
            // HA data not available as JM never start and relying on SAVEPOINT upgrade mode
            // Safe to rollback relying on savepoint
            return true;
        }

        var haDataAvailable = ctx.getFlinkService().isHaMetadataAvailable(configuration);
        if (!haDataAvailable) {
            LOG.warn("Rollback is not possible due to missing HA metadata");
        }
        return haDataAvailable;
    }