func NewNamedWatcherWithInformer()

in kubernetes/watcher.go [132:204]


func NewNamedWatcherWithInformer(
	name string,
	client kubernetes.Interface,
	resource Resource,
	informer cache.SharedInformer,
	opts WatchOptions,
) (Watcher, error) {
	var store cache.Store
	var queue workqueue.Interface
	var cachedObject runtime.Object

	store = informer.GetStore()
	queue = workqueue.NewNamed(name)

	if opts.IsUpdated == nil {
		opts.IsUpdated = func(o, n interface{}) bool {
			old, _ := accessor.ResourceVersion(o.(runtime.Object))
			new, _ := accessor.ResourceVersion(n.(runtime.Object))

			// Only enqueue changes that have a different resource versions to avoid processing resyncs.
			return old != new
		}
	}

	ctx, cancel := context.WithCancel(context.TODO())
	w := &watcher{
		client:       client,
		informer:     informer,
		store:        store,
		queue:        queue,
		ctx:          ctx,
		cachedObject: cachedObject,
		stop:         cancel,
		logger:       logp.NewLogger("kubernetes"),
		handler:      NoOpEventHandlerFuncs{},
	}

	_, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(o interface{}) {
			w.enqueue(o, add)
		},
		DeleteFunc: func(o interface{}) {
			w.enqueue(o, delete)
		},
		UpdateFunc: func(o, n interface{}) {
			if opts.IsUpdated(o, n) {
				w.enqueue(n, update)
			} else if opts.HonorReSyncs {
				// HonorReSyncs ensure that at the time when the kubernetes client does a "resync", i.e, a full list of all
				// objects we make sure that autodiscover processes them. Why is this necessary? An effective control loop works
				// based on two state changes, a list and a watch. A watch is triggered each time the state of the system changes.
				// However, there is no guarantee that all events from a watch are processed by the receiver. To ensure that missed events
				// are properly handled, a period re-list is done to ensure that every state within the system is effectively handled.
				// In this case, we are making sure that we are enqueueing an "add" event because, an runner that is already in Running
				// state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update.
				w.enqueue(n, add)
			}

			//We check the type of resource and only if it is namespace or node return the cacheObject
			switch resource.(type) {
			case *Namespace:
				w.cacheObject(o)
			case *Node:
				w.cacheObject(o)
			}
		},
	})
	if err != nil {
		return nil, err
	}

	return w, nil
}