internal/controllers/symphony/controller.go (288 lines of code) (raw):
package symphony
import (
"context"
"fmt"
"slices"
"sort"
apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/manager"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
type symphonyController struct {
client client.Client
noCacheClient client.Reader
}
func NewController(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&apiv1.Symphony{}).
Owns(&apiv1.Composition{}).
WithLogConstructor(manager.NewLogConstructor(mgr, "symphonyController")).
Complete(&symphonyController{
client: mgr.GetClient(),
noCacheClient: mgr.GetAPIReader(),
})
}
func (c *symphonyController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := logr.FromContextOrDiscard(ctx)
symph := &apiv1.Symphony{}
err := c.client.Get(ctx, req.NamespacedName, symph)
if err != nil {
logger.Error(err, "failed to get symphony")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
logger = logger.WithValues("symphonyName", symph.Name, "symphonyNamespace", symph.Namespace, "symphonyGeneration", symph.Generation)
ctx = logr.NewContext(ctx, logger)
if controllerutil.AddFinalizer(symph, "eno.azure.io/cleanup") {
err := c.client.Update(ctx, symph)
if err != nil {
logger.Error(err, "failed to add finalizer")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
existing := &apiv1.CompositionList{}
err = c.client.List(ctx, existing, client.InNamespace(symph.Namespace), client.MatchingFields{
manager.IdxCompositionsBySymphony: symph.Name,
})
if err != nil {
logger.Error(err, "failed to list existing compositions")
return ctrl.Result{}, err
}
modified, err := c.reconcileReverse(ctx, symph, existing)
if err != nil {
logger.Error(err, "failed to reconcile reverse")
return ctrl.Result{}, err
}
if modified {
return ctrl.Result{}, nil
}
// Remove finalizer when no compositions remain
if symph.DeletionTimestamp != nil {
if len(existing.Items) > 0 || !controllerutil.RemoveFinalizer(symph, "eno.azure.io/cleanup") {
return ctrl.Result{}, nil
}
err = c.client.Update(ctx, symph)
if err != nil {
logger.Error(err, "failed to remove finalizer")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
modified, err = c.reconcileForward(ctx, symph, existing)
if err != nil {
logger.Error(err, "failed to reconcile forward")
return ctrl.Result{}, err
}
if modified {
return ctrl.Result{}, nil
}
err = c.syncStatus(ctx, symph, existing)
if err != nil {
logger.Error(err, "failed to sync status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (c *symphonyController) reconcileReverse(ctx context.Context, symph *apiv1.Symphony, comps *apiv1.CompositionList) (bool, error) {
logger := logr.FromContextOrDiscard(ctx)
expectedSynths := map[string]struct{}{}
for _, variation := range symph.Spec.Variations {
expectedSynths[variation.Synthesizer.Name] = struct{}{}
}
// Delete compositions when their synth has been removed from the symphony
existingBySynthName := map[string][]*apiv1.Composition{}
for _, comp := range comps.Items {
comp := comp
existingBySynthName[comp.Spec.Synthesizer.Name] = append(existingBySynthName[comp.Spec.Synthesizer.Name], &comp)
hasVariation := slices.ContainsFunc(symph.Spec.Variations, func(vrn apiv1.Variation) bool {
return vrn.Synthesizer.Name == comp.Spec.Synthesizer.Name
})
if (hasVariation && symph.DeletionTimestamp == nil) || comp.DeletionTimestamp != nil {
continue
}
err := c.client.Delete(ctx, &comp)
if err != nil {
return false, fmt.Errorf("cleaning up composition: %w", err)
}
logger.V(0).Info("deleted composition because its variation was removed from the symphony", "compositionName", comp.Name, "compositionNamespace", comp.Namespace)
return true, nil
}
// Delete any duplicates we may have created in the past - leave the oldest one
for _, comps := range existingBySynthName {
if len(comps) < 2 {
continue
}
sort.Slice(comps, func(i, j int) bool { return comps[i].CreationTimestamp.Before(&comps[j].CreationTimestamp) })
err := c.client.Delete(ctx, comps[0])
if err != nil {
return false, fmt.Errorf("deleting duplicate composition: %w", err)
}
logger.V(0).Info("deleted composition because it's a duplicate", "compositionName", comps[0].Name, "compositionNamespace", comps[0].Namespace)
return true, nil
}
return false, nil
}
func (c *symphonyController) reconcileForward(ctx context.Context, symph *apiv1.Symphony, comps *apiv1.CompositionList) (modified bool, err error) {
logger := logr.FromContextOrDiscard(ctx)
for _, variation := range symph.Spec.Variations {
variation := variation
comp := &apiv1.Composition{}
comp.Namespace = symph.Namespace
comp.GenerateName = variation.Synthesizer.Name + "-"
comp.Spec.Bindings = getBindings(symph, &variation)
comp.Spec.Synthesizer = variation.Synthesizer
comp.Spec.SynthesisEnv = getSynthesisEnv(symph, &variation)
comp.Labels = variation.Labels
comp.Annotations = variation.Annotations
err := controllerutil.SetControllerReference(symph, comp, c.client.Scheme())
if err != nil {
return false, fmt.Errorf("setting composition's controller: %w", err)
}
logger := logger.WithValues("compositionName", comp.Name, "compositionNamespace", comp.Namespace)
// Compose missing variations
idx := slices.IndexFunc(comps.Items, func(existing apiv1.Composition) bool {
return existing.Spec.Synthesizer.Name == variation.Synthesizer.Name
})
if idx == -1 {
err := c.noCacheClient.List(ctx, comps, client.InNamespace(symph.Namespace))
if err != nil {
return false, fmt.Errorf("listing existing compositions without cache: %w", err)
}
for _, cur := range comps.Items {
owner := metav1.GetControllerOf(&cur)
if owner != nil && owner.UID == symph.UID && cur.Spec.Synthesizer.Name == variation.Synthesizer.Name {
return false, fmt.Errorf("stale cache - composition already exists")
}
}
err = c.client.Create(ctx, comp)
if k8serrors.IsForbidden(err) && k8serrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
logger.V(0).Info("skipping composition creation because the namespace is being terminated")
return false, nil
}
if err != nil {
return false, fmt.Errorf("creating composition: %w", err)
}
logger.V(0).Info("created composition for symphony")
return true, nil
}
// Diff and update if needed
existing := comps.Items[idx]
if equality.Semantic.DeepEqual(comp.Spec, existing.Spec) && !coalesceMetadata(&variation, &existing) {
continue // already matches
}
existing.Spec = comp.Spec
err = c.client.Update(ctx, &existing)
if err != nil {
return false, fmt.Errorf("updating existing composition: %w", err)
}
logger.V(0).Info("updated composition because its variation changed")
return true, nil
}
return false, nil
}
func (c *symphonyController) syncStatus(ctx context.Context, symph *apiv1.Symphony, comps *apiv1.CompositionList) error {
logger := logr.FromContextOrDiscard(ctx)
newStatus := c.buildStatus(symph, comps)
if equality.Semantic.DeepEqual(newStatus, symph.Status) {
return nil
}
copy := symph.DeepCopy()
copy.Status = newStatus
if err := c.client.Status().Patch(ctx, copy, client.MergeFrom(symph)); err != nil {
return fmt.Errorf("syncing status: %w", err)
}
logger.V(1).Info("sync'd symphony status", "ready", newStatus.Ready != nil, "reconciled", newStatus.Reconciled != nil, "synthesized", newStatus.Synthesized != nil)
return nil
}
func (c *symphonyController) buildStatus(symph *apiv1.Symphony, comps *apiv1.CompositionList) apiv1.SymphonyStatus {
newStatus := apiv1.SymphonyStatus{
ObservedGeneration: symph.Generation,
}
synthMap := map[string]struct{}{}
for _, comp := range comps.Items {
synthMap[comp.Spec.Synthesizer.Name] = struct{}{}
syn := comp.Status.CurrentSynthesis
if syn == nil {
continue
}
if newStatus.Ready.Before(syn.Ready) || newStatus.Ready == nil {
newStatus.Ready = syn.Ready
}
if newStatus.Reconciled.Before(syn.Reconciled) || newStatus.Reconciled == nil {
newStatus.Reconciled = syn.Reconciled
}
if newStatus.Synthesized.Before(syn.Synthesized) || newStatus.Synthesized == nil {
newStatus.Synthesized = syn.Synthesized
}
}
// Status should be nil for any states that haven't been reached by all compositions
for _, comp := range comps.Items {
syn := comp.Status.CurrentSynthesis
synInvalid := syn == nil || syn.ObservedCompositionGeneration != comp.Generation || comp.DeletionTimestamp != nil
if synInvalid || syn.Ready == nil {
newStatus.Ready = nil
}
if synInvalid || syn.Reconciled == nil {
newStatus.Reconciled = nil
}
if synInvalid || syn.Synthesized == nil {
newStatus.Synthesized = nil
}
}
return newStatus
}
// getBindings generates the bindings for a variation given it's symphony.
// Bindings specified by a variation take precedence over the symphony.
func getBindings(symph *apiv1.Symphony, vrn *apiv1.Variation) []apiv1.Binding {
res := append([]apiv1.Binding(nil), symph.Spec.Bindings...)
for _, bnd := range vrn.Bindings {
i := slices.IndexFunc(res, func(b apiv1.Binding) bool { return b.Key == bnd.Key })
if i >= 0 {
res[i] = bnd
} else {
res = append(res, bnd)
}
}
deduped := []apiv1.Binding{}
for i, bnd := range res {
j := slices.IndexFunc(res, func(b apiv1.Binding) bool { return b.Key == bnd.Key })
if i > j {
continue // duplicate
}
deduped = append(deduped, bnd)
}
return deduped
}
func getSynthesisEnv(symph *apiv1.Symphony, vrn *apiv1.Variation) []apiv1.EnvVar {
res := append([]apiv1.EnvVar(nil), vrn.SynthesisEnv...)
for _, evar := range symph.Spec.SynthesisEnv {
i := slices.IndexFunc(res, func(e apiv1.EnvVar) bool {
return evar.Name == e.Name
})
// Only use symhony var if the variation didn't specify it.
if i == -1 {
res = append(res, evar)
}
}
return res
}
func coalesceMetadata(variation *apiv1.Variation, existing *apiv1.Composition) bool {
var metaChanged bool
if existing.Labels == nil {
existing.Labels = map[string]string{}
}
for key, val := range variation.Labels {
if existing.Labels[key] != val {
metaChanged = true
}
existing.Labels[key] = val
}
if existing.Annotations == nil {
existing.Annotations = map[string]string{}
}
for key, val := range variation.Annotations {
if existing.Annotations[key] != val {
metaChanged = true
}
existing.Annotations[key] = val
}
return metaChanged
}