in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java [114:170]
private static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(
AbstractFlinkResource<SPEC, ?> target,
JobState stateAfterReconcile,
Configuration conf,
boolean upgrading,
Clock clock) {
var status = target.getStatus();
var spec = target.getSpec();
var reconciliationStatus = status.getReconciliationStatus();
// Clear errors
status.setError(null);
reconciliationStatus.setReconciliationTimestamp(clock.instant().toEpochMilli());
ReconciliationState state;
if (status.getReconciliationStatus().getState() == ReconciliationState.ROLLING_BACK) {
state = upgrading ? ReconciliationState.ROLLING_BACK : ReconciliationState.ROLLED_BACK;
} else {
state = upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED;
}
reconciliationStatus.setState(state);
SPEC clonedSpec;
if (status.getReconciliationStatus().getState() == ReconciliationState.ROLLING_BACK
|| status.getReconciliationStatus().getState() == ReconciliationState.ROLLED_BACK) {
clonedSpec = reconciliationStatus.deserializeLastReconciledSpec();
} else {
clonedSpec = ReconciliationUtils.clone(spec);
}
if (spec.getJob() != null) {
// For jobs we have to adjust the reconciled spec
var job = clonedSpec.getJob();
job.setState(stateAfterReconcile);
var lastSpec = reconciliationStatus.deserializeLastReconciledSpec();
if (lastSpec != null) {
// We preserve the last snapshot triggers to not lose new triggers during upgrade
job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce());
job.setCheckpointTriggerNonce(lastSpec.getJob().getCheckpointTriggerNonce());
}
if (target instanceof FlinkDeployment) {
// For application deployments we update the taskmanager info
((FlinkDeploymentStatus) status)
.setTaskManager(
getTaskManagerInfo(
target.getMetadata().getName(), conf, stateAfterReconcile));
}
reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
if (spec.getJob().getState() == JobState.SUSPENDED) {
// When a job is suspended by the user it is automatically marked stable
reconciliationStatus.markReconciledSpecAsStable();
}
} else {
reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
}
}