in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java [154:198]
protected AvailableUpgradeMode getAvailableUpgradeMode(
FlinkResourceContext<CR> ctx, Configuration deployConfig) throws Exception {
var resource = ctx.getResource();
var status = resource.getStatus();
var upgradeMode = resource.getSpec().getJob().getUpgradeMode();
if (upgradeMode == UpgradeMode.STATELESS) {
LOG.info("Stateless job, ready for upgrade");
return AvailableUpgradeMode.of(UpgradeMode.STATELESS);
}
var flinkService = ctx.getFlinkService();
if (ReconciliationUtils.isJobInTerminalState(status)
&& !flinkService.isHaMetadataAvailable(ctx.getObserveConfig())) {
LOG.info(
"Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint");
return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
}
if (ReconciliationUtils.isJobRunning(status)) {
LOG.info("Job is in running state, ready for upgrade with {}", upgradeMode);
var changedToLastStateWithoutHa =
ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
resource, ctx.getObserveConfig());
if (changedToLastStateWithoutHa) {
LOG.info(
"Using savepoint upgrade mode when switching to last-state without HA previously enabled");
return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
}
if (flinkVersionChanged(
ReconciliationUtils.getDeployedSpec(resource), resource.getSpec())) {
LOG.info("Using savepoint upgrade mode when upgrading Flink version");
return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
}
if (upgradeMode == UpgradeMode.LAST_STATE) {
return changeLastStateIfCheckpointTooOld(ctx, deployConfig);
}
return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
}
return AvailableUpgradeMode.unavailable();
}