private static void updateStatusForSpecReconciliation()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java [114:170]


    private static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(
            AbstractFlinkResource<SPEC, ?> target,
            JobState stateAfterReconcile,
            Configuration conf,
            boolean upgrading,
            Clock clock) {

        var status = target.getStatus();
        var spec = target.getSpec();
        var reconciliationStatus = status.getReconciliationStatus();

        // Clear errors
        status.setError(null);
        reconciliationStatus.setReconciliationTimestamp(clock.instant().toEpochMilli());
        ReconciliationState state;
        if (status.getReconciliationStatus().getState() == ReconciliationState.ROLLING_BACK) {
            state = upgrading ? ReconciliationState.ROLLING_BACK : ReconciliationState.ROLLED_BACK;
        } else {
            state = upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED;
        }
        reconciliationStatus.setState(state);

        SPEC clonedSpec;
        if (status.getReconciliationStatus().getState() == ReconciliationState.ROLLING_BACK
                || status.getReconciliationStatus().getState() == ReconciliationState.ROLLED_BACK) {
            clonedSpec = reconciliationStatus.deserializeLastReconciledSpec();
        } else {
            clonedSpec = ReconciliationUtils.clone(spec);
        }
        if (spec.getJob() != null) {
            // For jobs we have to adjust the reconciled spec
            var job = clonedSpec.getJob();
            job.setState(stateAfterReconcile);

            var lastSpec = reconciliationStatus.deserializeLastReconciledSpec();
            if (lastSpec != null) {
                // We preserve the last snapshot triggers to not lose new triggers during upgrade
                job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce());
                job.setCheckpointTriggerNonce(lastSpec.getJob().getCheckpointTriggerNonce());
            }

            if (target instanceof FlinkDeployment) {
                // For application deployments we update the taskmanager info
                ((FlinkDeploymentStatus) status)
                        .setTaskManager(
                                getTaskManagerInfo(
                                        target.getMetadata().getName(), conf, stateAfterReconcile));
            }
            reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
            if (spec.getJob().getState() == JobState.SUSPENDED) {
                // When a job is suspended by the user it is automatically marked stable
                reconciliationStatus.markReconciledSpecAsStable();
            }
        } else {
            reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
        }
    }