in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java [145:174]
private Optional<String> validateFlinkVersion(FlinkDeployment deployment) {
var spec = deployment.getSpec();
var version = spec.getFlinkVersion();
if (version == null) {
return Optional.of("Flink Version must be defined.");
}
if (!FlinkVersion.isSupported(version)) {
return Optional.of(
"Flink version " + version + " is not supported by this operator version");
}
var lastReconciledSpec =
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
if (lastReconciledSpec != null
&& lastReconciledSpec.getJob() != null
&& spec.getJob() != null
&& spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
var lastJob = lastReconciledSpec.getJob();
if (lastJob.getState() == JobState.SUSPENDED
&& lastJob.getUpgradeMode() == UpgradeMode.LAST_STATE
&& lastReconciledSpec.getFlinkVersion() != spec.getFlinkVersion()) {
return Optional.of(
"Changing flinkVersion after last-state suspend is not allowed. Restore your cluster with the current flinkVersion and perform the version upgrade afterwards.");
}
}
return Optional.empty();
}