public void reconcile()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java [102:193]


    public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
        var cr = ctx.getResource();
        var status = cr.getStatus();
        var reconciliationStatus = cr.getStatus().getReconciliationStatus();

        // If the resource is not ready for reconciliation we simply return
        if (!readyToReconcile(ctx)) {
            LOG.info("Not ready for reconciliation yet...");
            return;
        }

        // If this is the first deployment for the resource we simply submit the job and return.
        // No further logic is required at this point.
        if (reconciliationStatus.isBeforeFirstDeployment()) {
            LOG.info("Deploying for the first time");
            var spec = cr.getSpec();
            var deployConfig = ctx.getDeployConfig(spec);
            updateStatusBeforeFirstDeployment(cr, spec, deployConfig, status);
            deploy(
                    ctx,
                    spec,
                    deployConfig,
                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
                    false);

            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig, clock);
            return;
        }

        SPEC lastReconciledSpec =
                cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
        SPEC currentDeploySpec = cr.getSpec();
        applyAutoscalerParallelismOverrides(
                resourceScaler.getParallelismOverrides(ctx), currentDeploySpec);

        var specDiff =
                new ReflectiveDiffBuilder<>(
                                ctx.getDeploymentMode(), lastReconciledSpec, currentDeploySpec)
                        .build();
        var diffType = specDiff.getType();

        boolean specChanged =
                DiffType.IGNORE != diffType
                        || reconciliationStatus.getState() == ReconciliationState.UPGRADING;

        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
            specChanged = prepareCrForRollback(ctx, currentDeploySpec, lastReconciledSpec);
        }

        var observeConfig = ctx.getObserveConfig();
        if (specChanged) {
            var deployConfig = ctx.getDeployConfig(cr.getSpec());
            if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
                return;
            }
            triggerSpecChangeEvent(cr, specDiff);

            // Try scaling if this is not an upgrade change
            boolean scaled = diffType != DiffType.UPGRADE && scale(ctx, deployConfig);

            // Reconcile spec change unless scaling was enough
            if (scaled || reconcileSpecChange(ctx, deployConfig)) {
                // If we executed a scale or spec upgrade action we return, otherwise we
                // continue to reconcile other changes
                return;
            }
        } else {
            ReconciliationUtils.updateReconciliationMetadata(cr);
        }

        if (shouldRollBack(ctx, observeConfig)) {
            // Rollbacks are executed in two steps, we initiate it first then return
            if (initiateRollBack(status)) {
                return;
            }
            LOG.warn(MSG_ROLLBACK);
            eventRecorder.triggerEvent(
                    cr,
                    EventRecorder.Type.Normal,
                    EventRecorder.Reason.Rollback,
                    EventRecorder.Component.JobManagerDeployment,
                    MSG_ROLLBACK);
        } else if (!reconcileOtherChanges(ctx)) {
            if (resourceScaler.scale(ctx)) {
                LOG.info(
                        "Rescheduling new reconciliation immediately to execute scaling operation.");
                status.setImmediateReconciliationNeeded(true);
            } else {
                LOG.info("Resource fully reconciled, nothing to do...");
            }
        }
    }