private Optional validateSpecChange()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java [414:465]


    private Optional<String> validateSpecChange(
            FlinkDeployment deployment, Map<String, String> effectiveConfig) {
        FlinkDeploymentSpec newSpec = deployment.getSpec();

        if (deployment.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
            return Optional.empty();
        }

        FlinkDeploymentSpec oldSpec =
                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();

        if (newSpec.getJob() != null && oldSpec.getJob() == null) {
            return Optional.of("Cannot switch from session to job cluster");
        }

        if (newSpec.getJob() == null && oldSpec.getJob() != null) {
            return Optional.of("Cannot switch from job to session cluster");
        }

        KubernetesDeploymentMode oldDeploymentMode =
                oldSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : oldSpec.getMode();

        KubernetesDeploymentMode newDeploymentMode =
                newSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : newSpec.getMode();

        if (oldDeploymentMode == KubernetesDeploymentMode.NATIVE
                && newDeploymentMode != KubernetesDeploymentMode.NATIVE) {
            return Optional.of(
                    "Cannot switch from native kubernetes to standalone kubernetes cluster");
        }

        if (oldDeploymentMode == KubernetesDeploymentMode.STANDALONE
                && newDeploymentMode != KubernetesDeploymentMode.STANDALONE) {
            return Optional.of(
                    "Cannot switch from standalone kubernetes to native kubernetes cluster");
        }

        JobSpec oldJob = oldSpec.getJob();
        JobSpec newJob = newSpec.getJob();
        if (oldJob != null && newJob != null) {
            if (newJob.getSavepointRedeployNonce() != null
                    && !newJob.getSavepointRedeployNonce()
                            .equals(oldJob.getSavepointRedeployNonce())) {
                if (StringUtils.isNullOrWhitespaceOnly(newJob.getInitialSavepointPath())) {
                    return Optional.of(
                            "InitialSavepointPath must not be empty for savepoint redeployment");
                }
            }
        }

        return Optional.empty();
    }