in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java [212:301]
protected JobUpgrade getJobUpgrade(FlinkResourceContext<CR> ctx, Configuration deployConfig)
throws Exception {
var resource = ctx.getResource();
var status = resource.getStatus();
var upgradeMode = resource.getSpec().getJob().getUpgradeMode();
boolean terminal = ReconciliationUtils.isJobInTerminalState(status);
if (upgradeMode == UpgradeMode.STATELESS) {
LOG.info("Stateless job, ready for upgrade");
return JobUpgrade.stateless(terminal);
}
var flinkService = ctx.getFlinkService();
if (ReconciliationUtils.isJobCancelled(status)
|| (terminal && !flinkService.isHaMetadataAvailable(ctx.getObserveConfig()))) {
if (!SnapshotUtils.lastSavepointKnown(status)) {
throw new UpgradeFailureException(
"Job is in terminal state but last checkpoint is unknown, possibly due to an unrecoverable restore error. Manual restore required.",
"UpgradeFailed");
}
LOG.info("Job is in terminal state, ready for upgrade from observed latest state");
return JobUpgrade.savepoint(true);
}
if (ReconciliationUtils.isJobCancelling(status)) {
LOG.info("Cancellation is in progress. Waiting for cancelled state.");
return JobUpgrade.pendingCancellation();
}
boolean running = ReconciliationUtils.isJobRunning(status);
boolean versionChanged =
flinkVersionChanged(
ReconciliationUtils.getDeployedSpec(resource), resource.getSpec());
if (upgradeMode == UpgradeMode.SAVEPOINT) {
if (running) {
LOG.info("Job is in running state, ready for upgrade with savepoint");
return JobUpgrade.savepoint(false);
} else if (versionChanged
|| deployConfig.get(
KubernetesOperatorConfigOptions
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)) {
LOG.info("Falling back to last-state upgrade mode from savepoint");
ctx.getResource()
.getSpec()
.getJob()
.setUpgradeMode(upgradeMode = UpgradeMode.LAST_STATE);
} else {
LOG.info("Last-state fallback is disabled, waiting for upgradable state");
return JobUpgrade.pendingUpgrade();
}
}
if (upgradeMode == UpgradeMode.LAST_STATE) {
if (versionChanged) {
// We need some special handling in case of version upgrades where HA based
// last-state upgrade is not possible
boolean savepointPossible =
!StringUtils.isNullOrWhitespaceOnly(
ctx.getObserveConfig()
.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY));
if (running && savepointPossible) {
LOG.info("Using savepoint to upgrade Flink version");
return JobUpgrade.savepoint(false);
} else if (ReconciliationUtils.isJobCancellable(resource.getStatus())) {
LOG.info("Using last-state upgrade with cancellation to upgrade Flink version");
return JobUpgrade.lastStateUsingCancel();
} else {
LOG.info(
"Neither savepoint nor cancellation is possible, cannot perform stateful version upgrade");
return JobUpgrade.pendingUpgrade();
}
}
boolean cancellable = allowLastStateCancel(ctx);
if (running) {
var mode = getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
LOG.info("Job is running, using {} for last-state upgrade", mode);
return mode;
}
if (cancellable) {
LOG.info("Job is not running, using cancel to perform last-state upgrade");
return JobUpgrade.lastStateUsingCancel();
}
}
return JobUpgrade.unavailable();
}