private Optional validateSpecChange()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java [394:452]


    private Optional<String> validateSpecChange(
            FlinkDeployment deployment, Map<String, String> effectiveConfig) {
        FlinkDeploymentSpec newSpec = deployment.getSpec();

        if (deployment.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
            if (newSpec.getJob() != null && !newSpec.getJob().getState().equals(JobState.RUNNING)) {
                return Optional.of("Job must start in running state");
            }

            return Optional.empty();
        }

        FlinkDeploymentSpec oldSpec =
                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();

        if (newSpec.getJob() != null && oldSpec.getJob() == null) {
            return Optional.of("Cannot switch from session to job cluster");
        }

        if (newSpec.getJob() == null && oldSpec.getJob() != null) {
            return Optional.of("Cannot switch from job to session cluster");
        }

        KubernetesDeploymentMode oldDeploymentMode =
                oldSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : oldSpec.getMode();

        KubernetesDeploymentMode newDeploymentMode =
                newSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : newSpec.getMode();

        if (oldDeploymentMode == KubernetesDeploymentMode.NATIVE
                && newDeploymentMode != KubernetesDeploymentMode.NATIVE) {
            return Optional.of(
                    "Cannot switch from native kubernetes to standalone kubernetes cluster");
        }

        if (oldDeploymentMode == KubernetesDeploymentMode.STANDALONE
                && newDeploymentMode != KubernetesDeploymentMode.STANDALONE) {
            return Optional.of(
                    "Cannot switch from standalone kubernetes to native kubernetes cluster");
        }

        JobSpec oldJob = oldSpec.getJob();
        JobSpec newJob = newSpec.getJob();
        if (oldJob != null && newJob != null) {
            if (StringUtils.isNullOrWhitespaceOnly(
                            effectiveConfig.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()))
                    && deployment.getStatus().getJobManagerDeploymentStatus()
                            != JobManagerDeploymentStatus.MISSING
                    && ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
                            deployment, configManager.getObserveConfig(deployment))) {
                return Optional.of(
                        String.format(
                                "Job could not be upgraded to last-state while config key[%s] is not set",
                                CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            }
        }

        return Optional.empty();
    }