in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java [394:452]
private Optional<String> validateSpecChange(
FlinkDeployment deployment, Map<String, String> effectiveConfig) {
FlinkDeploymentSpec newSpec = deployment.getSpec();
if (deployment.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
if (newSpec.getJob() != null && !newSpec.getJob().getState().equals(JobState.RUNNING)) {
return Optional.of("Job must start in running state");
}
return Optional.empty();
}
FlinkDeploymentSpec oldSpec =
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
if (newSpec.getJob() != null && oldSpec.getJob() == null) {
return Optional.of("Cannot switch from session to job cluster");
}
if (newSpec.getJob() == null && oldSpec.getJob() != null) {
return Optional.of("Cannot switch from job to session cluster");
}
KubernetesDeploymentMode oldDeploymentMode =
oldSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : oldSpec.getMode();
KubernetesDeploymentMode newDeploymentMode =
newSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : newSpec.getMode();
if (oldDeploymentMode == KubernetesDeploymentMode.NATIVE
&& newDeploymentMode != KubernetesDeploymentMode.NATIVE) {
return Optional.of(
"Cannot switch from native kubernetes to standalone kubernetes cluster");
}
if (oldDeploymentMode == KubernetesDeploymentMode.STANDALONE
&& newDeploymentMode != KubernetesDeploymentMode.STANDALONE) {
return Optional.of(
"Cannot switch from standalone kubernetes to native kubernetes cluster");
}
JobSpec oldJob = oldSpec.getJob();
JobSpec newJob = newSpec.getJob();
if (oldJob != null && newJob != null) {
if (StringUtils.isNullOrWhitespaceOnly(
effectiveConfig.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()))
&& deployment.getStatus().getJobManagerDeploymentStatus()
!= JobManagerDeploymentStatus.MISSING
&& ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
deployment, configManager.getObserveConfig(deployment))) {
return Optional.of(
String.format(
"Job could not be upgraded to last-state while config key[%s] is not set",
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
}
}
return Optional.empty();
}