private Optional validateFlinkVersion()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java [128:151]


    private Optional<String> validateFlinkVersion(FlinkDeployment deployment) {
        var spec = deployment.getSpec();
        if (spec.getFlinkVersion() == null) {
            return Optional.of("Flink Version must be defined.");
        }

        var lastReconciledSpec =
                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();

        if (lastReconciledSpec != null
                && lastReconciledSpec.getJob() != null
                && spec.getJob() != null
                && spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
            var lastJob = lastReconciledSpec.getJob();
            if (lastJob.getState() == JobState.SUSPENDED
                    && lastJob.getUpgradeMode() == UpgradeMode.LAST_STATE
                    && lastReconciledSpec.getFlinkVersion() != spec.getFlinkVersion()) {
                return Optional.of(
                        "Changing flinkVersion after last-state suspend is not allowed. Restore your cluster with the current flinkVersion and perform the version upgrade afterwards.");
            }
        }

        return Optional.empty();
    }