in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java [90:152]
protected boolean reconcileSpecChange(FlinkResourceContext<CR> ctx, Configuration deployConfig)
throws Exception {
var resource = ctx.getResource();
STATUS status = resource.getStatus();
var reconciliationStatus = status.getReconciliationStatus();
SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
SPEC currentDeploySpec = resource.getSpec();
JobState currentJobState = lastReconciledSpec.getJob().getState();
JobState desiredJobState = currentDeploySpec.getJob().getState();
if (currentJobState == JobState.RUNNING) {
if (desiredJobState == JobState.RUNNING) {
LOG.info("Upgrading/Restarting running job, suspending first...");
}
AvailableUpgradeMode availableUpgradeMode = getAvailableUpgradeMode(ctx, deployConfig);
if (!availableUpgradeMode.isAvailable()) {
return false;
}
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
EventRecorder.Reason.Suspended,
EventRecorder.Component.JobManagerDeployment,
MSG_SUSPENDED);
UpgradeMode upgradeMode = availableUpgradeMode.getUpgradeMode().get();
// We must record the upgrade mode used to the status later
currentDeploySpec.getJob().setUpgradeMode(upgradeMode);
cancelJob(ctx, upgradeMode);
if (desiredJobState == JobState.RUNNING) {
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
resource, deployConfig, clock);
} else {
ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, clock);
}
}
if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
// We inherit the upgrade mode unless stateless upgrade requested
if (currentDeploySpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
currentDeploySpec
.getJob()
.setUpgradeMode(lastReconciledSpec.getJob().getUpgradeMode());
}
// We record the target spec into an upgrading state before deploying
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig, clock);
statusRecorder.patchAndCacheStatus(resource);
restoreJob(
ctx,
currentDeploySpec,
deployConfig,
// We decide to enforce HA based on how job was previously suspended
lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, clock);
}
return true;
}