kubernetes/watcher.go (239 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package kubernetes import ( "context" "fmt" "time" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/metadata" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "github.com/elastic/elastic-agent-libs/logp" ) const ( add = "add" update = "update" delete = "delete" ) var ( accessor = meta.NewAccessor() ) // Watcher watches Kubernetes resources events type Watcher interface { // Start watching Kubernetes API for new events after resources were listed Start() error // Stop watching Kubernetes API for new events Stop() // AddEventHandler add event handlers for corresponding event type watched AddEventHandler(ResourceEventHandler) // GetEventHandler returns the event handlers for corresponding event type watched GetEventHandler() ResourceEventHandler // Store returns the store object for the watcher Store() cache.Store // Client returns the kubernetes client object used by the watcher Client() kubernetes.Interface // CachedObject returns the old object before change during the last updated event CachedObject() runtime.Object } // WatchOptions controls watch behaviors type WatchOptions struct { // SyncTimeout is a timeout for listing historical resources SyncTimeout time.Duration // Node is used for filtering watched resource to given node, use "" for all nodes Node string // Namespace is used for filtering watched resource to given namespace, use "" for all namespaces Namespace string // IsUpdated allows registering a func that allows the invoker of the Watch to decide what amounts to an update // vs what does not. IsUpdated func(old, new interface{}) bool // HonorReSyncs allows resync events to be requeued on the worker HonorReSyncs bool } type item struct { object interface{} objectRaw interface{} state string } type watcher struct { client kubernetes.Interface informer cache.SharedInformer store cache.Store queue workqueue.Interface ctx context.Context stop context.CancelFunc handler ResourceEventHandler logger *logp.Logger cachedObject runtime.Object } // NewWatcher initializes the watcher client to provide a events handler for // resource from the cluster (filtered to the given node) // Note: This watcher won't emit workqueue metrics. Use NewNamedWatcher to provide an explicit queue name. func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { return NewNamedWatcher("", client, resource, opts, indexers) } // NewNamedWatcher initializes the watcher client to provide an events handler for // resource from the cluster (filtered to the given node) and also allows to name the k8s // client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue // metrics, if it is empty, its metrics will not be logged by the k8s client. func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { informer, _, err := NewInformer(client, resource, opts, indexers) if err != nil { return nil, err } return NewNamedWatcherWithInformer(name, client, resource, informer, opts) } // NewNamedWatcherWithInformer initializes the watcher client to provide an events handler for // resource from the cluster (filtered to the given node) and also allows to name the k8s // client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue // metrics, if it is empty, its metrics will not be logged by the k8s client. // This function requires the underlying informer to be passed by the caller. func NewNamedWatcherWithInformer( name string, client kubernetes.Interface, resource Resource, informer cache.SharedInformer, opts WatchOptions, ) (Watcher, error) { var store cache.Store var queue workqueue.Interface var cachedObject runtime.Object store = informer.GetStore() queue = workqueue.NewNamed(name) if opts.IsUpdated == nil { opts.IsUpdated = func(o, n interface{}) bool { old, _ := accessor.ResourceVersion(o.(runtime.Object)) new, _ := accessor.ResourceVersion(n.(runtime.Object)) // Only enqueue changes that have a different resource versions to avoid processing resyncs. return old != new } } ctx, cancel := context.WithCancel(context.TODO()) w := &watcher{ client: client, informer: informer, store: store, queue: queue, ctx: ctx, cachedObject: cachedObject, stop: cancel, logger: logp.NewLogger("kubernetes"), handler: NoOpEventHandlerFuncs{}, } _, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { w.enqueue(o, add) }, DeleteFunc: func(o interface{}) { w.enqueue(o, delete) }, UpdateFunc: func(o, n interface{}) { if opts.IsUpdated(o, n) { w.enqueue(n, update) } else if opts.HonorReSyncs { // HonorReSyncs ensure that at the time when the kubernetes client does a "resync", i.e, a full list of all // objects we make sure that autodiscover processes them. Why is this necessary? An effective control loop works // based on two state changes, a list and a watch. A watch is triggered each time the state of the system changes. // However, there is no guarantee that all events from a watch are processed by the receiver. To ensure that missed events // are properly handled, a period re-list is done to ensure that every state within the system is effectively handled. // In this case, we are making sure that we are enqueueing an "add" event because, an runner that is already in Running // state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update. w.enqueue(n, add) } //We check the type of resource and only if it is namespace or node return the cacheObject switch resource.(type) { case *Namespace: w.cacheObject(o) case *Node: w.cacheObject(o) } }, }) if err != nil { return nil, err } return w, nil } // NewMetadataWatcher initializes a metadata-only watcher client to provide an events handler for // resource from the cluster (filtered to the given node). // Event handlers defined on this watcher receive PartialObjectMetadata resources. // Note: This watcher won't emit workqueue metrics. Use NewNamedWatcher to provide an explicit queue name. func NewMetadataWatcher( client kubernetes.Interface, metadataClient metadata.Interface, gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers, transformFunc cache.TransformFunc, ) (Watcher, error) { return NewNamedMetadataWatcher("", client, metadataClient, gvr, opts, indexers, transformFunc) } // NewNamedMetadataWatcher initializes a metadata-only watcher client to provide an events handler for // resource from the cluster (filtered to the given node) and also allows to name the k8s // client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue // metrics, if it is empty, its metrics will not be logged by the k8s client. // Event handlers defined on this watcher receive PartialObjectMetadata resources. func NewNamedMetadataWatcher( name string, client kubernetes.Interface, metadataClient metadata.Interface, gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers, transformFunc cache.TransformFunc, ) (Watcher, error) { informer := NewMetadataInformer(metadataClient, gvr, opts, indexers) if transformFunc != nil { err := informer.SetTransform(transformFunc) if err != nil { return nil, err } } return NewNamedWatcherWithInformer(name, client, &v1.PartialObjectMetadata{}, informer, opts) } // AddEventHandler adds a resource handler to process each request that is coming into the watcher func (w *watcher) AddEventHandler(h ResourceEventHandler) { w.handler = h } // GetEventHandler returns the watcher's event handler func (w *watcher) GetEventHandler() ResourceEventHandler { return w.handler } // Store returns the store object for the resource that is being watched func (w *watcher) Store() cache.Store { return w.store } // Client returns the kubernetes client object used by the watcher func (w *watcher) Client() kubernetes.Interface { return w.client } // CachedObject returns the old object in cache during the last updated event func (w *watcher) CachedObject() runtime.Object { return w.cachedObject } // Start watching pods func (w *watcher) Start() error { go w.informer.Run(w.ctx.Done()) if !cache.WaitForCacheSync(w.ctx.Done(), w.informer.HasSynced) { return fmt.Errorf("kubernetes informer unable to sync cache") } w.logger.Debugf("cache sync done") // Wrap the process function with wait.Until so that if the controller crashes, it starts up again after a second. go wait.Until(func() { for w.process(w.ctx) { } }, time.Second*1, w.ctx.Done()) return nil } func (w *watcher) Stop() { w.queue.ShutDown() w.stop() } // enqueue takes the most recent object that was received, figures out the namespace/name of the object // and adds it to the work queue for processing. func (w *watcher) enqueue(obj interface{}, state string) { // DeletionHandlingMetaNamespaceKeyFunc that we get a key only if the resource's state is not Unknown. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { return } if deleted, ok := obj.(cache.DeletedFinalStateUnknown); ok { w.logger.Debugf("Enqueued DeletedFinalStateUnknown contained object: %+v", deleted.Obj) obj = deleted.Obj } w.queue.Add(&item{key, obj, state}) } // cacheObject updates watcher with the old version of cache objects before change during update events func (w *watcher) cacheObject(o interface{}) { if old, ok := o.(runtime.Object); !ok { utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o)) } else { w.cachedObject = old } } // process gets the top of the work queue and processes the object that is received. func (w *watcher) process(_ context.Context) bool { obj, quit := w.queue.Get() if quit { return false } defer w.queue.Done(obj) var entry *item var ok bool if entry, ok = obj.(*item); !ok { utilruntime.HandleError(fmt.Errorf("expected *item in workqueue but got %#v", obj)) return true } key, ok := entry.object.(string) if !ok { return false } o, exists, err := w.store.GetByKey(key) if err != nil { utilruntime.HandleError(fmt.Errorf("getting object %#v from cache: %w", obj, err)) return true } if !exists { if entry.state == delete { w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key) // delete anyway in order to clean states w.handler.OnDelete(entry.objectRaw) } return true } switch entry.state { case add: w.handler.OnAdd(o) case update: w.handler.OnUpdate(o) case delete: w.handler.OnDelete(o) } return true }