in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java [120:188]
public static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(
AbstractFlinkResource<SPEC, ?> target,
JobState stateAfterReconcile,
Configuration conf,
boolean upgrading,
Clock clock) {
var status = target.getStatus();
var spec = target.getSpec();
var reconciliationStatus = status.getReconciliationStatus();
// Clear errors
status.setError(null);
reconciliationStatus.setReconciliationTimestamp(clock.instant().toEpochMilli());
var state = reconciliationStatus.getState();
if (state == ReconciliationState.ROLLING_BACK) {
state = upgrading ? ReconciliationState.ROLLING_BACK : ReconciliationState.ROLLED_BACK;
} else {
state = upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED;
}
reconciliationStatus.setState(state);
if (state == ReconciliationState.ROLLING_BACK || state == ReconciliationState.ROLLED_BACK) {
var lastSpecWithMeta = reconciliationStatus.deserializeLastReconciledSpecWithMeta();
var job = lastSpecWithMeta.getSpec().getJob();
if (job != null) {
// During the rollback we have to update the upgradeMode in the lastReconciledSpec
// based on the rollback upgradeMode, this ensures that the next upgrade can be
// executed correctly and we don't accidentally lose state.
job.setUpgradeMode(spec.getJob().getUpgradeMode());
reconciliationStatus.setLastReconciledSpec(
SpecUtils.writeSpecWithMeta(
lastSpecWithMeta.getSpec(), lastSpecWithMeta.getMeta()));
}
} else {
SPEC clonedSpec = ReconciliationUtils.clone(spec);
if (spec.getJob() != null) {
// For jobs we have to adjust the reconciled spec
var job = clonedSpec.getJob();
job.setState(stateAfterReconcile);
var lastSpec = reconciliationStatus.deserializeLastReconciledSpec();
if (lastSpec != null) {
// We preserve the last snapshot triggers to not lose new triggers during
// upgrade
job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce());
job.setCheckpointTriggerNonce(lastSpec.getJob().getCheckpointTriggerNonce());
}
if (target instanceof FlinkDeployment) {
// For application deployments we update the taskmanager info
((FlinkDeploymentStatus) status)
.setTaskManager(
getTaskManagerInfo(
target.getMetadata().getName(),
conf,
stateAfterReconcile));
}
reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
if (spec.getJob().getState() == JobState.SUSPENDED) {
// When a job is suspended by the user it is automatically marked stable
reconciliationStatus.markReconciledSpecAsStable();
}
} else {
reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
}
}
}