in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java [79:123]
protected JobUpgrade getJobUpgrade(
FlinkResourceContext<FlinkDeployment> ctx, Configuration deployConfig)
throws Exception {
var deployment = ctx.getResource();
var status = deployment.getStatus();
var availableUpgradeMode = super.getJobUpgrade(ctx, deployConfig);
if (availableUpgradeMode.isAvailable() || !availableUpgradeMode.isAllowFallback()) {
return availableUpgradeMode;
}
var flinkService = ctx.getFlinkService();
if (HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
&& HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())
&& flinkService.isHaMetadataAvailable(deployConfig)) {
LOG.info(
"Job is not running but HA metadata is available for last state restore, ready for upgrade");
return JobUpgrade.lastStateUsingHaMeta();
}
var jmDeployStatus = status.getJobManagerDeploymentStatus();
if (jmDeployStatus != JobManagerDeploymentStatus.MISSING
&& status.getReconciliationStatus()
.deserializeLastReconciledSpec()
.getJob()
.getUpgradeMode()
!= UpgradeMode.LAST_STATE
&& FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
deleteJmThatNeverStarted(flinkService, deployment, deployConfig);
return getJobUpgrade(ctx, deployConfig);
}
if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING
|| jmDeployStatus == JobManagerDeploymentStatus.ERROR)
&& !flinkService.isHaMetadataAvailable(deployConfig)) {
throw new UpgradeFailureException(
"JobManager deployment is missing and HA data is not available to make stateful upgrades. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ "Manual restore required.",
"UpgradeFailed");
}
return JobUpgrade.unavailable();
}