public void reconcile()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java [96:176]


    public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
        var cr = ctx.getResource();
        var status = cr.getStatus();
        var reconciliationStatus = cr.getStatus().getReconciliationStatus();

        // If the resource is not ready for reconciliation we simply return
        if (!readyToReconcile(ctx)) {
            LOG.info("Not ready for reconciliation yet...");
            return;
        }

        // If this is the first deployment for the resource we simply submit the job and return.
        // No further logic is required at this point.
        if (reconciliationStatus.isBeforeFirstDeployment()) {
            var spec = cr.getSpec();

            // If the job is submitted in suspend state, no need to reconcile
            if (spec.getJob() != null && spec.getJob().getState().equals(JobState.SUSPENDED)) {
                return;
            }

            LOG.info("Deploying for the first time");
            var deployConfig = ctx.getDeployConfig(spec);
            updateStatusBeforeFirstDeployment(
                    cr, spec, deployConfig, status, ctx.getKubernetesClient());

            deploy(ctx, spec, deployConfig, getInitialSnapshotPath(spec), false);

            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig, clock);
            return;
        }

        SPEC lastReconciledSpec =
                cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
        SPEC currentDeploySpec = cr.getSpec();

        applyAutoscaler(ctx);

        var reconciliationState = reconciliationStatus.getState();
        var specDiff =
                new ReflectiveDiffBuilder<>(
                                ctx.getDeploymentMode(), lastReconciledSpec, currentDeploySpec)
                        .build();
        var diffType = specDiff.getType();

        boolean specChanged =
                DiffType.IGNORE != diffType || reconciliationState == ReconciliationState.UPGRADING;

        if (shouldRollBack(ctx, specChanged, lastReconciledSpec)) {
            prepareCrForRollback(ctx, specChanged, lastReconciledSpec);
            specChanged = true;
            diffType = DiffType.UPGRADE;
        }

        if (specChanged) {
            var deployConfig = ctx.getDeployConfig(cr.getSpec());
            if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
                return;
            }
            triggerSpecChangeEvent(cr, specDiff, ctx.getKubernetesClient());

            // Try scaling if this is not an upgrade/redeploy change
            boolean scaled =
                    diffType != DiffType.SAVEPOINT_REDEPLOY
                            && diffType != DiffType.UPGRADE
                            && scale(ctx, deployConfig);

            // Reconcile spec change unless scaling was enough
            if (scaled || reconcileSpecChange(diffType, ctx, deployConfig, lastReconciledSpec)) {
                // If we executed a scale or spec upgrade action we return, otherwise we
                // continue to reconcile other changes
                return;
            }
        } else {
            ReconciliationUtils.updateReconciliationMetadata(cr);
        }

        if (!reconcileOtherChanges(ctx)) {
            LOG.info("Resource fully reconciled, nothing to do...");
        }
    }