internal/controllers/watch/kind.go (238 lines of code) (raw):
package watch
import (
"context"
"fmt"
"math/rand"
"path"
"reflect"
"slices"
"time"
apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/manager"
"github.com/Azure/eno/internal/resource"
"github.com/go-logr/logr"
"golang.org/x/time/rate"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
type KindWatchController struct {
client client.Client
gvk schema.GroupVersionKind
cancel context.CancelFunc
}
func NewKindWatchController(ctx context.Context, parent *WatchController, resource *apiv1.ResourceRef) (*KindWatchController, error) {
logger := logr.FromContextOrDiscard(ctx).WithValues("group", resource.Group, "version", resource.Version, "kind", resource.Kind)
k := &KindWatchController{
client: parent.mgr.GetClient(),
gvk: schema.GroupVersionKind{
Group: resource.Group,
Version: resource.Version,
Kind: resource.Kind,
},
}
ref := &metav1.PartialObjectMetadata{}
ref.SetGroupVersionKind(k.gvk)
rrc, err := k.newResourceWatchController(parent, ref)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
k.cancel = cancel
go func() {
logger.V(1).Info("starting kind watch controller")
rrc.Start(ctx)
logger.V(1).Info("kind watch controller stopped")
}()
return k, nil
}
func (k *KindWatchController) newResourceWatchController(parent *WatchController, ref *metav1.PartialObjectMetadata) (controller.Controller, error) {
rrc, err := controller.NewUnmanaged("kindWatchController", parent.mgr, controller.Options{
LogConstructor: manager.NewLogConstructor(parent.mgr, "kindWatchController"),
RateLimiter: &workqueue.TypedBucketRateLimiter[reconcile.Request]{
// Be careful about feedback loops - low, hardcoded rate limits make sense here.
// Maybe expose as a flag in the future.
Limiter: rate.NewLimiter(rate.Every(time.Second), 2),
},
Reconciler: k,
})
if err != nil {
return nil, err
}
// Watch the input resources
err = rrc.Watch(source.Kind(parent.mgr.GetCache(), ref, &handler.TypedEnqueueRequestForObject[*metav1.PartialObjectMetadata]{}))
if err != nil {
return nil, err
}
// Watch inputs declared by refs/bindings in synthesizers/compositions
err = rrc.Watch(source.Kind(parent.mgr.GetCache(), &apiv1.Composition{},
handler.TypedEnqueueRequestsFromMapFunc(handler.TypedMapFunc[*apiv1.Composition, reconcile.Request](func(ctx context.Context, comp *apiv1.Composition) []reconcile.Request {
if comp.Spec.Synthesizer.Name == "" {
return nil
}
synth := &apiv1.Synthesizer{}
err = parent.client.Get(ctx, types.NamespacedName{Name: comp.Spec.Synthesizer.Name}, synth)
if err != nil {
logr.FromContextOrDiscard(ctx).Error(err, "unable to get synthesizer for composition")
return nil
}
return k.buildRequests(synth, *comp)
}))))
if err != nil {
return nil, err
}
err = rrc.Watch(source.Kind(parent.mgr.GetCache(), &apiv1.Synthesizer{},
handler.TypedEnqueueRequestsFromMapFunc(handler.TypedMapFunc[*apiv1.Synthesizer, reconcile.Request](func(ctx context.Context, synth *apiv1.Synthesizer) []reconcile.Request {
compList := &apiv1.CompositionList{}
err = parent.client.List(ctx, compList, client.MatchingFields{
manager.IdxCompositionsBySynthesizer: synth.Name,
})
if err != nil {
logr.FromContextOrDiscard(ctx).Error(err, "unable to get compositions for synthesizer")
return nil
}
return k.buildRequests(synth, compList.Items...)
}))))
if err != nil {
return nil, err
}
return rrc, err
}
// buildRequests returns a reconcile request for every binding to this resource kind.
func (k *KindWatchController) buildRequests(synth *apiv1.Synthesizer, comps ...apiv1.Composition) []reconcile.Request {
keys := map[string]struct{}{}
reqs := []reconcile.Request{}
for _, ref := range synth.Spec.Refs {
if ref.Resource.Name == "" {
keys[ref.Key] = struct{}{}
continue // ref does not have an "implicit" binding
}
nsn := types.NamespacedName{Namespace: ref.Resource.Namespace, Name: ref.Resource.Name}
req := reconcile.Request{NamespacedName: nsn}
if !slices.Contains(reqs, req) {
reqs = append(reqs, req)
}
}
for _, comp := range comps {
for _, binding := range comp.Spec.Bindings {
if _, found := keys[binding.Key]; !found {
continue
}
nsn := types.NamespacedName{Namespace: binding.Resource.Namespace, Name: binding.Resource.Name}
req := reconcile.Request{NamespacedName: nsn}
if !slices.Contains(reqs, req) {
reqs = append(reqs, req)
}
}
}
return reqs
}
func (k *KindWatchController) Stop(ctx context.Context) {
logger := logr.FromContextOrDiscard(ctx)
if k.cancel != nil {
k.cancel()
}
logger.V(1).Info("stopping kind watch controller")
}
func (k *KindWatchController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := logr.FromContextOrDiscard(ctx).WithValues("group", k.gvk.Group, "version", k.gvk.Version, "kind", k.gvk.Kind)
meta := &metav1.PartialObjectMetadata{}
meta.SetGroupVersionKind(k.gvk)
err := k.client.Get(ctx, req.NamespacedName, meta)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
list := &apiv1.SynthesizerList{}
err = k.client.List(ctx, list, client.MatchingFields{
manager.IdxSynthesizersByRef: path.Join(k.gvk.Group, k.gvk.Version, k.gvk.Kind),
})
if err != nil {
return ctrl.Result{}, fmt.Errorf("listing synthesizers: %w", err)
}
rand.Shuffle(len(list.Items), func(i, j int) { list.Items[i], list.Items[j] = list.Items[j], list.Items[i] })
for _, synth := range list.Items {
for _, ref := range synth.Spec.Refs {
if ref.Resource.Name != meta.GetName() || ref.Resource.Namespace != meta.GetNamespace() || ref.Resource.Group != k.gvk.Group || ref.Resource.Kind != k.gvk.Kind || ref.Resource.Version != k.gvk.Version {
continue
}
list := &apiv1.CompositionList{}
err = k.client.List(ctx, list, client.MatchingFields{
manager.IdxCompositionsBySynthesizer: synth.Name,
})
if err != nil {
return ctrl.Result{}, fmt.Errorf("listing compositions: %w", err)
}
modified, err := k.updateCompositions(ctx, logger, &synth, meta, list)
if modified || err != nil {
return ctrl.Result{}, err
}
}
list := &apiv1.CompositionList{}
err = k.client.List(ctx, list, client.MatchingFields{
manager.IdxCompositionsByBinding: path.Join(synth.Name, meta.Namespace, meta.Name),
})
if err != nil {
return ctrl.Result{}, fmt.Errorf("listing compositions: %w", err)
}
modified, err := k.updateCompositions(ctx, logger, &synth, meta, list)
if modified || err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
func (k *KindWatchController) updateCompositions(ctx context.Context, logger logr.Logger, synth *apiv1.Synthesizer, meta *metav1.PartialObjectMetadata, list *apiv1.CompositionList) (bool, error) {
for _, comp := range list.Items {
key := findRefKey(&comp, synth, meta)
if key == "" {
logger.V(1).Info("no matching input key found for resource")
continue
}
revs := resource.NewInputRevisions(meta, key)
if !setInputRevisions(&comp, revs) {
continue
}
// TODO: Reduce risk of conflict errors here
err := k.client.Status().Update(ctx, &comp)
if err != nil {
return false, fmt.Errorf("updating input revisions: %w", err)
}
logger.V(0).Info("noticed input resource change", "compositionName", comp.Name, "compositionNamespace", comp.Namespace, "ref", key)
return true, nil // wait for requeue
}
return false, nil
}
func findRefKey(comp *apiv1.Composition, synth *apiv1.Synthesizer, meta *metav1.PartialObjectMetadata) string {
var bindingKey string
for _, binding := range comp.Spec.Bindings {
if binding.Resource.Name == meta.GetName() && binding.Resource.Namespace == meta.GetNamespace() {
bindingKey = binding.Key
break
}
}
for _, ref := range synth.Spec.Refs {
gvk := meta.GetObjectKind().GroupVersionKind()
matchesGVK := ref.Resource.Group == gvk.Group && ref.Resource.Version == gvk.Version && ref.Resource.Kind == gvk.Kind
matchesKey := bindingKey == ref.Key
matchesNSN := ref.Resource.Name == meta.GetName() && ref.Resource.Namespace == meta.GetNamespace()
if matchesGVK && (matchesKey || matchesNSN) {
return ref.Key
}
}
return ""
}
func setInputRevisions(comp *apiv1.Composition, revs *apiv1.InputRevisions) bool {
for i, ir := range comp.Status.InputRevisions {
if ir.Key != revs.Key {
continue
}
if reflect.DeepEqual(ir, *revs) {
return false
}
comp.Status.InputRevisions[i] = *revs
return true
}
comp.Status.InputRevisions = append(comp.Status.InputRevisions, *revs)
return true
}