in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java [102:193]
public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
var cr = ctx.getResource();
var status = cr.getStatus();
var reconciliationStatus = cr.getStatus().getReconciliationStatus();
// If the resource is not ready for reconciliation we simply return
if (!readyToReconcile(ctx)) {
LOG.info("Not ready for reconciliation yet...");
return;
}
// If this is the first deployment for the resource we simply submit the job and return.
// No further logic is required at this point.
if (reconciliationStatus.isBeforeFirstDeployment()) {
LOG.info("Deploying for the first time");
var spec = cr.getSpec();
var deployConfig = ctx.getDeployConfig(spec);
updateStatusBeforeFirstDeployment(cr, spec, deployConfig, status);
deploy(
ctx,
spec,
deployConfig,
Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
false);
ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig, clock);
return;
}
SPEC lastReconciledSpec =
cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
SPEC currentDeploySpec = cr.getSpec();
applyAutoscalerParallelismOverrides(
resourceScaler.getParallelismOverrides(ctx), currentDeploySpec);
var specDiff =
new ReflectiveDiffBuilder<>(
ctx.getDeploymentMode(), lastReconciledSpec, currentDeploySpec)
.build();
var diffType = specDiff.getType();
boolean specChanged =
DiffType.IGNORE != diffType
|| reconciliationStatus.getState() == ReconciliationState.UPGRADING;
if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
specChanged = prepareCrForRollback(ctx, currentDeploySpec, lastReconciledSpec);
}
var observeConfig = ctx.getObserveConfig();
if (specChanged) {
var deployConfig = ctx.getDeployConfig(cr.getSpec());
if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
return;
}
triggerSpecChangeEvent(cr, specDiff);
// Try scaling if this is not an upgrade change
boolean scaled = diffType != DiffType.UPGRADE && scale(ctx, deployConfig);
// Reconcile spec change unless scaling was enough
if (scaled || reconcileSpecChange(ctx, deployConfig)) {
// If we executed a scale or spec upgrade action we return, otherwise we
// continue to reconcile other changes
return;
}
} else {
ReconciliationUtils.updateReconciliationMetadata(cr);
}
if (shouldRollBack(ctx, observeConfig)) {
// Rollbacks are executed in two steps, we initiate it first then return
if (initiateRollBack(status)) {
return;
}
LOG.warn(MSG_ROLLBACK);
eventRecorder.triggerEvent(
cr,
EventRecorder.Type.Normal,
EventRecorder.Reason.Rollback,
EventRecorder.Component.JobManagerDeployment,
MSG_ROLLBACK);
} else if (!reconcileOtherChanges(ctx)) {
if (resourceScaler.scale(ctx)) {
LOG.info(
"Rescheduling new reconciliation immediately to execute scaling operation.");
status.setImmediateReconciliationNeeded(true);
} else {
LOG.info("Resource fully reconciled, nothing to do...");
}
}
}