in experiments/compositions/composition/internal/controller/expander_reconciler.go [164:412]
func (r *ExpanderReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger = logger.WithName(r.Composition.Name).WithName(r.InputGVK.Group)
logger.Info("Got Input API for expansion", "request", req)
inputcr := unstructured.Unstructured{}
inputcr.SetGroupVersionKind(r.InputGVK)
logger = logger.WithName(req.NamespacedName.Name).WithName(req.NamespacedName.Namespace)
if err := r.Get(ctx, req.NamespacedName, &inputcr); err != nil {
logger.Error(err, "unable to fetch Input API Object")
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if !inputcr.GetDeletionTimestamp().IsZero() {
return r.reconcileDelete(ctx, logger, inputcr)
}
// Add a finalizer to prevent removal of facade before all applied objects are cleaned up.
if !controllerutil.ContainsFinalizer(&inputcr, finalizerName) {
controllerutil.AddFinalizer(&inputcr, finalizerName)
if err := r.Update(ctx, &inputcr); err != nil {
logger.Error(err, "Unable to add finalizer to input CR")
return ctrl.Result{}, err
}
}
// Grab the latest composition
// TODO(barni@) - Decide how we want the latest composition changes are to be applied.
var compositionCR compositionv1alpha1.Composition
if err := r.Client.Get(ctx, r.Composition, &compositionCR); err != nil {
if client.IgnoreNotFound(err) != nil {
logger.Error(err, "Unable to fetch Composition Object")
return ctrl.Result{}, err
}
}
// Associate a plan object with this input CR
planNN, plancr, err := r.getPlanForInputCR(ctx, &inputcr)
if err != nil {
logger.Error(err, "Unable to get Plan Object")
return ctrl.Result{}, err
}
// Since applylib doesnt support multiple apply batches we are
// tracking all old objects and accumulating them each apply.
oldAppliers := []*applier.Applier{}
// Create a new status for comparison later
newStatus := compositionv1alpha1.PlanStatus{
Stages: map[string]*compositionv1alpha1.StageStatus{},
// TODO: Accumulates LastPruned.
// Ideally we need to reset if input/composition gen changes
LastPruned: plancr.Status.LastPruned,
}
// Try updating status before returning
defer r.updatePlanStatus(ctx, plancr, &newStatus)
expanderDebugLogsEnabled := false
_, exist := inputcr.GetAnnotations()["composition-expander-debug-logs"]
if exist {
expanderDebugLogsEnabled = true
logger.Info("annotation'composition-expander-debug-logs' is turned on.")
}
logger.Info("annotation'composition-expander-debug-logs' is turned off.")
// Write out the (in-order) stages to the plan as a reference for later when we need to delete resources.
stagesEvaluated := []string{}
for index := range compositionCR.Spec.Expanders {
stagesEvaluated = append(stagesEvaluated, compositionCR.Spec.Expanders[index].Name)
}
metav1.SetMetaDataAnnotation(&plancr.ObjectMeta, stagesAnnotation, strings.Join(stagesEvaluated, ","))
err = r.Client.Update(ctx, plancr)
if err != nil {
logger.Error(err, "error setting annotation in plan", "plan", planNN, "annotation", stagesAnnotation)
return ctrl.Result{}, err
}
// Grab a top level logger so we can add expander name in the eval and apply sections
loggerCR := logger
stagesApplied := []string{}
values := map[string]interface{}{}
requeueAgain := false
// ---------- Evaluate and Apply expanders in order ---------------------
for index, expander := range compositionCR.Spec.Expanders {
planUpdated := false
reason := ""
// ------------------- EVALUATION SECTION -----------------------
values, planUpdated, reason, err = r.evaluate(ctx, logger, &inputcr, planNN, expander, values, expanderDebugLogsEnabled)
_, iswaitErr := err.(*EvaluateWaitError)
if iswaitErr {
newStatus.AppendWaitingCondition(expander.Name, err.Error(), reason)
// Subsume the error
requeueAgain = true
break
}
if err != nil {
// Skip apply phase and return
newStatus.AppendErrorCondition(expander.Name, err.Error(), reason)
return ctrl.Result{}, err
}
// Inject plan.Ready Condition with list of expanders
newStatus.ClearCondition(compositionv1alpha1.Ready)
message := fmt.Sprintf("Evaluated stage: %s", expander.Name)
newStatus.AppendCondition(compositionv1alpha1.Ready, metav1.ConditionFalse, message, "EvaluationPending")
// ------------------- APPLIER SECTION -----------------------
logger = loggerCR.WithName(expander.Name).WithName("Apply")
// Re-read the Plan CR to load the expanded manifests
oldGeneration := plancr.GetGeneration()
if err := r.Client.Get(ctx, planNN, plancr); err != nil {
logger.Error(err, "unable to read Plan CR")
return ctrl.Result{}, err
}
if planUpdated && oldGeneration == plancr.GetGeneration() {
logger.Info("Did not get the latest Plan CR. Will retry.", "generation", oldGeneration)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
stage, ok := plancr.Spec.Stages[expander.Name]
if !ok {
err := fmt.Errorf("plancr.spec.stages[%s] not found !!", stage)
logger.Error(err, "error applying stage", "stage", stage)
// This is not expected since we just processed the stage above
// We dont want to return error. Lets retry again in 20 secs.
return ctrl.Result{RequeueAfter: 20 * time.Second}, nil
}
if stage.Values != "" {
// This looks like an Getter stage. skip it
continue
}
// Lets not make empty manifests from a stage an error
// We may have conditional code that generates no manifests in a stage
// We will log it though
if stage.Manifest == "" {
logger.Info("Empty manifests returned for stage", "stage", stage)
}
// Create Applier and wait for the Applier to complete
ac := applier.ApplierClient{
Client: r.Client,
Dynamic: r.Dynamic,
RESTMapper: r.RESTMapper,
}
namespace := ""
if compositionCR.Spec.NamespaceMode != compositionv1alpha1.NamespaceModeExplicit {
namespace = inputcr.GetNamespace()
}
applier := applier.NewApplier(ctx, logger, ac, expander.Name, namespace, r.InputGVR.Resource, plancr, compositionCR.Spec.Readiness)
err = applier.Load() // Load Manifests
if err != nil {
r.Recorder.Event(&inputcr, "Warning", "ApplyFailed", fmt.Sprintf("error loading manifests for expander, name: %s", expander.Name))
logger.Error(err, "Unable to Load manifests for applying")
// Inject plan.ERROR Condition
newStatus.AppendErrorCondition(expander.Name, err.Error(), "FailedLoadingManifestsFromPlan")
return ctrl.Result{}, err
}
logger.Info("Successfully loaded manifests for applying")
if newStatus.Stages == nil {
newStatus.Stages = map[string]*compositionv1alpha1.StageStatus{}
}
newStatus.Stages[expander.Name] = &compositionv1alpha1.StageStatus{ResourceCount: applier.Count()}
// Prune only for the last expander section
prune := false
if index == len(compositionCR.Spec.Expanders)-1 {
prune = true
}
err = applier.Apply(oldAppliers, prune) // Apply Manifests
applier.UpdateStageStatus(&newStatus)
if err != nil {
r.Recorder.Event(&inputcr, "Warning", "ApplyFailed",
fmt.Sprintf("error applying manifests for expander, name: %s", expander.Name))
logger.Error(err, "Unable to apply manifests")
// Inject plan.ERROR Condition
newStatus.AppendErrorCondition(expander.Name, err.Error(), "FailedApplyingManifests")
return ctrl.Result{}, err
}
logger.Info("Successfully applied manifests")
r.Recorder.Event(&inputcr, "Normal", "ResourcesApplied", fmt.Sprintf("All expanded resources were applied. name: %s", expander.Name))
ready, err := applier.AreResourcesReady()
applier.UpdateStageStatus(&newStatus)
if err != nil {
r.Recorder.Event(&inputcr, "Warning", "ReconcileFailed", fmt.Sprintf("Failed waiting for resources to be reconciled. name: %s", expander.Name))
logger.Error(err, "Failed waiting for applied resources to reconcile")
// Inject plan.ERROR Condition
newStatus.AppendErrorCondition(expander.Name, err.Error(), "FailedWaitingForAppliedResources")
return ctrl.Result{}, err
}
oldAppliers = append(oldAppliers, applier)
if !ready {
r.Recorder.Event(&inputcr, "Warning", "ReconcileFailed", fmt.Sprintf("Some resources are not healthy. name: %s", expander.Name))
logger.Info("Applied successfully but some resources did not become healthy")
// Inject plan.Waiting Condition
newStatus.AppendWaitingCondition(expander.Name, "Not all resources are healthy", "WaitingForAppliedResources")
// Request a re-reconcile
requeueAgain = true
break
}
logger.Info("Applied resources successfully.")
// Implicit getter: Make the applied objects available in the values passed to subsequent stages
values = applier.AddAppliedObjectsIntoValues(values)
stagesApplied = append(stagesApplied, expander.Name)
r.Recorder.Event(&inputcr, "Normal", "ResourcesReconciled", fmt.Sprintf("All applied resources were reconciled. name: %s", expander.Name))
// Inject plan.Ready Condition with list of expanders
newStatus.ClearCondition(compositionv1alpha1.Ready)
message = fmt.Sprintf("Applied stages: %s", strings.Join(stagesApplied, ", "))
newStatus.AppendCondition(compositionv1alpha1.Ready, metav1.ConditionFalse, message, "PendingStages")
if expanderDebugLogsEnabled {
r.Recorder.Event(&inputcr, "Normal", fmt.Sprintf("Finished expander stage %d: %s", index, expander.Name), expanderDebugLog(&inputcr)+fmt.Sprintf("---resource count: %d", applier.Count()))
for i, resourceStatus := range newStatus.Stages[expander.Name].LastApplied {
logger.Info("Expander debug logs", "Resource", i, "Name", resourceStatus.Name, "Namespace", resourceStatus.Namespace, "Group",
resourceStatus.Group, "Version", resourceStatus.Version, "Kind", resourceStatus.Kind, "Status", resourceStatus.Health)
}
}
}
// Inject plan.Ready Condition with list of expanders
newStatus.ClearCondition(compositionv1alpha1.Ready)
message := fmt.Sprintf("Evaluated and Applied stages: %s", strings.Join(stagesApplied, ", "))
newStatus.AppendCondition(compositionv1alpha1.Ready, metav1.ConditionTrue, message, "ProcessedAllStages")
newStatus.InputGeneration = inputcr.GetGeneration()
newStatus.Generation = plancr.GetGeneration()
newStatus.CompositionGeneration = compositionCR.GetGeneration()
newStatus.CompositionUID = compositionCR.GetUID()
if requeueAgain {
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
return ctrl.Result{}, nil
}