in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java [75:128]
protected AvailableUpgradeMode getAvailableUpgradeMode(
FlinkResourceContext<FlinkDeployment> ctx, Configuration deployConfig)
throws Exception {
var deployment = ctx.getResource();
var status = deployment.getStatus();
var availableUpgradeMode = super.getAvailableUpgradeMode(ctx, deployConfig);
if (availableUpgradeMode.isAvailable() || !availableUpgradeMode.isAllowFallback()) {
return availableUpgradeMode;
}
var flinkService = ctx.getFlinkService();
if (deployConfig.getBoolean(
KubernetesOperatorConfigOptions
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
&& HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
&& HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())
&& !flinkVersionChanged(
ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
if (flinkService.isHaMetadataAvailable(deployConfig)) {
LOG.info(
"Job is not running but HA metadata is available for last state restore, ready for upgrade");
return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
}
}
var jmDeployStatus = status.getJobManagerDeploymentStatus();
if (jmDeployStatus != JobManagerDeploymentStatus.MISSING
&& status.getReconciliationStatus()
.deserializeLastReconciledSpec()
.getJob()
.getUpgradeMode()
!= UpgradeMode.LAST_STATE
&& FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
deleteJmThatNeverStarted(flinkService, deployment, deployConfig);
return getAvailableUpgradeMode(ctx, deployConfig);
}
if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING
|| jmDeployStatus == JobManagerDeploymentStatus.ERROR)
&& !flinkService.isHaMetadataAvailable(deployConfig)) {
throw new RecoveryFailureException(
"JobManager deployment is missing and HA data is not available to make stateful upgrades. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ "Manual restore required.",
"UpgradeFailed");
}
LOG.info(
"Job is not running and HA metadata is not available or usable for executing the upgrade, waiting for upgradeable state");
return AvailableUpgradeMode.unavailable();
}