public static void updateStatusForSpecReconciliation()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java [120:188]


    public static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(
            AbstractFlinkResource<SPEC, ?> target,
            JobState stateAfterReconcile,
            Configuration conf,
            boolean upgrading,
            Clock clock) {

        var status = target.getStatus();
        var spec = target.getSpec();
        var reconciliationStatus = status.getReconciliationStatus();

        // Clear errors
        status.setError(null);
        reconciliationStatus.setReconciliationTimestamp(clock.instant().toEpochMilli());

        var state = reconciliationStatus.getState();
        if (state == ReconciliationState.ROLLING_BACK) {
            state = upgrading ? ReconciliationState.ROLLING_BACK : ReconciliationState.ROLLED_BACK;
        } else {
            state = upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED;
        }
        reconciliationStatus.setState(state);

        if (state == ReconciliationState.ROLLING_BACK || state == ReconciliationState.ROLLED_BACK) {
            var lastSpecWithMeta = reconciliationStatus.deserializeLastReconciledSpecWithMeta();
            var job = lastSpecWithMeta.getSpec().getJob();
            if (job != null) {
                // During the rollback we have to update the upgradeMode in the lastReconciledSpec
                // based on the rollback upgradeMode, this ensures that the next upgrade can be
                // executed correctly and we don't accidentally lose state.
                job.setUpgradeMode(spec.getJob().getUpgradeMode());
                reconciliationStatus.setLastReconciledSpec(
                        SpecUtils.writeSpecWithMeta(
                                lastSpecWithMeta.getSpec(), lastSpecWithMeta.getMeta()));
            }
        } else {
            SPEC clonedSpec = ReconciliationUtils.clone(spec);
            if (spec.getJob() != null) {
                // For jobs we have to adjust the reconciled spec
                var job = clonedSpec.getJob();
                job.setState(stateAfterReconcile);

                var lastSpec = reconciliationStatus.deserializeLastReconciledSpec();
                if (lastSpec != null) {
                    // We preserve the last snapshot triggers to not lose new triggers during
                    // upgrade
                    job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce());
                    job.setCheckpointTriggerNonce(lastSpec.getJob().getCheckpointTriggerNonce());
                }

                if (target instanceof FlinkDeployment) {
                    // For application deployments we update the taskmanager info
                    ((FlinkDeploymentStatus) status)
                            .setTaskManager(
                                    getTaskManagerInfo(
                                            target.getMetadata().getName(),
                                            conf,
                                            stateAfterReconcile));
                }
                reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
                if (spec.getJob().getState() == JobState.SUSPENDED) {
                    // When a job is suspended by the user it is automatically marked stable
                    reconciliationStatus.markReconciledSpecAsStable();
                }
            } else {
                reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
            }
        }
    }