internal/manager/manager.go (213 lines of code) (raw):
package manager
import (
"context"
"errors"
"fmt"
"math/rand/v2"
"os"
"strconv"
"net/http"
_ "net/http/pprof"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
apiv1 "github.com/Azure/eno/api/v1"
)
func init() {
log.SetLogger(zap.New(zap.WriteTo(os.Stdout)))
}
// IMPORTANT: There are several things to know about how controller-runtime is configured:
// - Resource slices are only watched by the reconciler process to avoid the cost of watching all of them in the controller
// - Resource slices are not deep copied when reading from the informer - do not mutate them
// - The resource slices cached by the informer do not have the configured manifests since they are held by the reconstitution cache anyway
const (
ManagerLabelKey = "app.kubernetes.io/managed-by"
ManagerLabelValue = "eno"
)
func init() {
go func() {
if addr := os.Getenv("PPROF_ADDR"); addr != "" {
err := http.ListenAndServe(addr, nil)
panic(fmt.Sprintf("unable to serve pprof listener: %s", err))
}
}()
}
func New(logger logr.Logger, opts *Options) (ctrl.Manager, error) {
return newMgr(logger, opts, true, false)
}
func NewReconciler(logger logr.Logger, opts *Options) (ctrl.Manager, error) {
return newMgr(logger, opts, false, true)
}
func NewTest(logger logr.Logger, opts *Options) (ctrl.Manager, error) {
return newMgr(logger, opts, true, true)
}
func newMgr(logger logr.Logger, opts *Options, isController, isReconciler bool) (ctrl.Manager, error) {
opts.Rest.QPS = float32(opts.qps)
scheme := runtime.NewScheme()
err := apiv1.SchemeBuilder.AddToScheme(scheme)
if err != nil {
return nil, err
}
err = corev1.SchemeBuilder.AddToScheme(scheme)
if err != nil {
return nil, err
}
mgrOpts := manager.Options{
Logger: logger,
HealthProbeBindAddress: opts.HealthProbeAddr,
Scheme: scheme,
Metrics: server.Options{
BindAddress: opts.MetricsAddr,
},
BaseContext: func() context.Context {
return logr.NewContext(context.Background(), logger)
},
Cache: cache.Options{
ByObject: make(map[client.Object]cache.ByObject),
},
LeaderElection: opts.LeaderElection,
LeaderElectionNamespace: opts.LeaderElectionNamespace,
LeaderElectionResourceLock: opts.LeaderElectionResourceLock,
LeaderElectionID: opts.LeaderElectionID,
LeaseDuration: &opts.ElectionLeaseDuration,
RenewDeadline: &opts.ElectionLeaseRenewDeadline,
RetryPeriod: &opts.ElectionLeaseRetryPeriod,
LeaderElectionReleaseOnCancel: true,
Controller: config.Controller{SkipNameValidation: ptr.To(true)},
}
if ratioStr := os.Getenv("CHAOS_RATIO"); ratioStr != "" {
mgrOpts.NewClient = func(config *rest.Config, options client.Options) (client.Client, error) {
base, err := client.New(config, options)
if err != nil {
return nil, err
}
ratio, err := strconv.ParseFloat(ratioStr, 64)
if err != nil {
return nil, err
}
return &chaosClient{Client: base, ratio: ratio}, nil
}
}
if isController {
// Only cache pods in the synthesizer pod namespace and owned by this controller
mgrOpts.Cache.ByObject[&corev1.Pod{}] = cache.ByObject{
Namespaces: map[string]cache.Config{
opts.SynthesizerPodNamespace: {
LabelSelector: labels.SelectorFromSet(labels.Set{ManagerLabelKey: ManagerLabelValue}),
},
},
}
}
mgrOpts.Cache.ByObject[&apiv1.Composition{}] = newCacheOptions(opts.CompositionNamespace, opts.CompositionSelector)
sliceCacheOpts := newCacheOptions(opts.CompositionNamespace, labels.Everything())
sliceCacheOpts.UnsafeDisableDeepCopy = ptr.To(true)
sliceCacheOpts.Transform = func(obj any) (any, error) {
slice, ok := obj.(*apiv1.ResourceSlice)
if !ok {
return obj, nil
}
for i := range slice.Spec.Resources {
slice.Spec.Resources[i].Manifest = "" // remove big manifest that we don't need
}
return slice, nil
}
mgrOpts.Cache.ByObject[&apiv1.ResourceSlice{}] = sliceCacheOpts
mgr, err := ctrl.NewManager(opts.Rest, mgrOpts)
if err != nil {
return nil, err
}
if isController {
err = mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.Composition{}, IdxCompositionsBySynthesizer, func(o client.Object) []string {
comp := o.(*apiv1.Composition)
return []string{comp.Spec.Synthesizer.Name}
})
if err != nil {
return nil, err
}
err = mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.Composition{}, IdxCompositionsBySymphony, indexController())
if err != nil {
return nil, err
}
err = mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.Composition{}, IdxCompositionsByBinding, indexResourceBindings())
if err != nil {
return nil, err
}
err = mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.Synthesizer{}, IdxSynthesizersByRef, indexSynthRefs())
if err != nil {
return nil, err
}
}
mgr.AddHealthzCheck("ping", healthz.Ping)
mgr.AddReadyzCheck("ping", healthz.Ping)
return mgr, nil
}
func NewLogConstructor(mgr ctrl.Manager, controllerName string) func(*reconcile.Request) logr.Logger {
return NewTypedLogConstructor[*reconcile.Request](mgr, controllerName)
}
func NewTypedLogConstructor[T any](mgr ctrl.Manager, controllerName string) func(T) logr.Logger {
return func(req T) logr.Logger {
l := mgr.GetLogger().WithValues("controller", controllerName)
return l
}
}
func SingleEventHandler() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(handler.MapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
return []reconcile.Request{{}}
}))
}
type chaosClient struct {
client.Client
ratio float64
}
func (c *chaosClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
if c.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.Client.Create(ctx, obj, opts...)
}
func (c *chaosClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
if c.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.Client.Delete(ctx, obj, opts...)
}
func (c *chaosClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
if c.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.Client.Update(ctx, obj, opts...)
}
func (c *chaosClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
if c.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.Client.Patch(ctx, obj, patch, opts...)
}
func (c *chaosClient) Status() client.SubResourceWriter {
return &chaosStatusClient{SubResourceWriter: c.Client.Status(), parent: c}
}
type chaosStatusClient struct {
client.SubResourceWriter
parent *chaosClient
}
func (c *chaosStatusClient) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error {
if c.parent.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.SubResourceWriter.Update(ctx, obj, opts...)
}
func (c *chaosStatusClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error {
if c.parent.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.SubResourceWriter.Patch(ctx, obj, patch, opts...)
}