private boolean shouldRollBack()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java [386:438]


    private boolean shouldRollBack(FlinkResourceContext<CR> ctx, Configuration configuration) {

        var resource = ctx.getResource();
        var reconciliationStatus = resource.getStatus().getReconciliationStatus();
        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
            return true;
        }

        if (!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)
                || reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK
                || reconciliationStatus.isLastReconciledSpecStable()) {
            return false;
        }

        var lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
        if (lastStableSpec == null) {
            // Nothing to roll back to yet
            return false;
        }

        if (lastStableSpec.getJob() != null
                && lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
            // Should not roll back to suspended state
            return false;
        }

        if (flinkVersionChanged(resource.getSpec(), lastStableSpec)) {
            // Should not roll back Flink version changes
            return false;
        }

        Duration readinessTimeout =
                configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
        if (!clock.instant()
                .minus(readinessTimeout)
                .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()))) {
            return false;
        }

        if (resource.getSpec().getJob() != null
                && resource.getSpec().getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT
                && FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
            // HA data not available as JM never start and relying on SAVEPOINT upgrade mode
            // Safe to rollback relying on savepoint
            return true;
        }

        var haDataAvailable = ctx.getFlinkService().isHaMetadataAvailable(configuration);
        if (!haDataAvailable) {
            LOG.warn("Rollback is not possible due to missing HA metadata");
        }
        return haDataAvailable;
    }