in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java [111:210]
protected boolean reconcileSpecChange(
DiffType diffType,
FlinkResourceContext<CR> ctx,
Configuration deployConfig,
SPEC lastReconciledSpec)
throws Exception {
var resource = ctx.getResource();
STATUS status = resource.getStatus();
SPEC currentDeploySpec = resource.getSpec();
JobState currentJobState = lastReconciledSpec.getJob().getState();
JobState desiredJobState = currentDeploySpec.getJob().getState();
if (diffType == DiffType.SAVEPOINT_REDEPLOY) {
redeployWithSavepoint(
ctx, deployConfig, resource, status, currentDeploySpec, desiredJobState);
return true;
}
if (currentJobState == JobState.RUNNING) {
var jobUpgrade = getJobUpgrade(ctx, deployConfig);
if (!jobUpgrade.isAvailable()) {
// If job upgrade is currently not available for some reason we must still check if
// other reconciliation action may be taken while we wait...
LOG.info(
"Job is not running and checkpoint information is not available for executing the upgrade, waiting for upgradeable state");
return !jobUpgrade.allowOtherReconcileActions;
}
LOG.debug("Job upgrade available: {}", jobUpgrade);
var suspendMode = jobUpgrade.getSuspendMode();
if (suspendMode != SuspendMode.NOOP) {
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
EventRecorder.Reason.Suspended,
EventRecorder.Component.JobManagerDeployment,
MSG_SUSPENDED,
ctx.getKubernetesClient());
}
boolean async = cancelJob(ctx, suspendMode);
if (async) {
// Async cancellation will be completed in the background, so we must exit
// reconciliation early and wait until it completes to finish the upgrade.
resource.getStatus()
.getReconciliationStatus()
.setState(ReconciliationState.UPGRADING);
ReconciliationUtils.updateLastReconciledSpec(
resource,
(s, m) -> {
s.getJob().setUpgradeMode(jobUpgrade.getRestoreMode());
m.setFirstDeployment(false);
});
return true;
}
// We must record the upgrade mode used to the status later
currentDeploySpec.getJob().setUpgradeMode(jobUpgrade.getRestoreMode());
if (desiredJobState == JobState.RUNNING) {
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
resource, deployConfig, clock);
} else {
ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, clock);
}
if (suspendMode == SuspendMode.NOOP) {
// If already cancelled we want to restore immediately so we modify the current
// state
// We don't do this when we actually performed a potentially lengthy cancel action
// to allow reconciling the spec
lastReconciledSpec.getJob().setUpgradeMode(jobUpgrade.getRestoreMode());
currentJobState = JobState.SUSPENDED;
}
}
if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
// We inherit the upgrade mode unless stateless upgrade requested
if (currentDeploySpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
currentDeploySpec
.getJob()
.setUpgradeMode(lastReconciledSpec.getJob().getUpgradeMode());
}
// We record the target spec into an upgrading state before deploying
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig, clock);
statusRecorder.patchAndCacheStatus(resource, ctx.getKubernetesClient());
restoreJob(
ctx,
currentDeploySpec,
deployConfig,
// We decide to enforce HA based on how job was previously suspended
lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, clock);
}
return true;
}