in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java [414:465]
private Optional<String> validateSpecChange(
FlinkDeployment deployment, Map<String, String> effectiveConfig) {
FlinkDeploymentSpec newSpec = deployment.getSpec();
if (deployment.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
return Optional.empty();
}
FlinkDeploymentSpec oldSpec =
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
if (newSpec.getJob() != null && oldSpec.getJob() == null) {
return Optional.of("Cannot switch from session to job cluster");
}
if (newSpec.getJob() == null && oldSpec.getJob() != null) {
return Optional.of("Cannot switch from job to session cluster");
}
KubernetesDeploymentMode oldDeploymentMode =
oldSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : oldSpec.getMode();
KubernetesDeploymentMode newDeploymentMode =
newSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : newSpec.getMode();
if (oldDeploymentMode == KubernetesDeploymentMode.NATIVE
&& newDeploymentMode != KubernetesDeploymentMode.NATIVE) {
return Optional.of(
"Cannot switch from native kubernetes to standalone kubernetes cluster");
}
if (oldDeploymentMode == KubernetesDeploymentMode.STANDALONE
&& newDeploymentMode != KubernetesDeploymentMode.STANDALONE) {
return Optional.of(
"Cannot switch from standalone kubernetes to native kubernetes cluster");
}
JobSpec oldJob = oldSpec.getJob();
JobSpec newJob = newSpec.getJob();
if (oldJob != null && newJob != null) {
if (newJob.getSavepointRedeployNonce() != null
&& !newJob.getSavepointRedeployNonce()
.equals(oldJob.getSavepointRedeployNonce())) {
if (StringUtils.isNullOrWhitespaceOnly(newJob.getInitialSavepointPath())) {
return Optional.of(
"InitialSavepointPath must not be empty for savepoint redeployment");
}
}
}
return Optional.empty();
}