in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java [96:176]
public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
var cr = ctx.getResource();
var status = cr.getStatus();
var reconciliationStatus = cr.getStatus().getReconciliationStatus();
// If the resource is not ready for reconciliation we simply return
if (!readyToReconcile(ctx)) {
LOG.info("Not ready for reconciliation yet...");
return;
}
// If this is the first deployment for the resource we simply submit the job and return.
// No further logic is required at this point.
if (reconciliationStatus.isBeforeFirstDeployment()) {
var spec = cr.getSpec();
// If the job is submitted in suspend state, no need to reconcile
if (spec.getJob() != null && spec.getJob().getState().equals(JobState.SUSPENDED)) {
return;
}
LOG.info("Deploying for the first time");
var deployConfig = ctx.getDeployConfig(spec);
updateStatusBeforeFirstDeployment(
cr, spec, deployConfig, status, ctx.getKubernetesClient());
deploy(ctx, spec, deployConfig, getInitialSnapshotPath(spec), false);
ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig, clock);
return;
}
SPEC lastReconciledSpec =
cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
SPEC currentDeploySpec = cr.getSpec();
applyAutoscaler(ctx);
var reconciliationState = reconciliationStatus.getState();
var specDiff =
new ReflectiveDiffBuilder<>(
ctx.getDeploymentMode(), lastReconciledSpec, currentDeploySpec)
.build();
var diffType = specDiff.getType();
boolean specChanged =
DiffType.IGNORE != diffType || reconciliationState == ReconciliationState.UPGRADING;
if (shouldRollBack(ctx, specChanged, lastReconciledSpec)) {
prepareCrForRollback(ctx, specChanged, lastReconciledSpec);
specChanged = true;
diffType = DiffType.UPGRADE;
}
if (specChanged) {
var deployConfig = ctx.getDeployConfig(cr.getSpec());
if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
return;
}
triggerSpecChangeEvent(cr, specDiff, ctx.getKubernetesClient());
// Try scaling if this is not an upgrade/redeploy change
boolean scaled =
diffType != DiffType.SAVEPOINT_REDEPLOY
&& diffType != DiffType.UPGRADE
&& scale(ctx, deployConfig);
// Reconcile spec change unless scaling was enough
if (scaled || reconcileSpecChange(diffType, ctx, deployConfig, lastReconciledSpec)) {
// If we executed a scale or spec upgrade action we return, otherwise we
// continue to reconcile other changes
return;
}
} else {
ReconciliationUtils.updateReconciliationMetadata(cr);
}
if (!reconcileOtherChanges(ctx)) {
LOG.info("Resource fully reconciled, nothing to do...");
}
}