in kubernetes/watcher.go [132:204]
func NewNamedWatcherWithInformer(
name string,
client kubernetes.Interface,
resource Resource,
informer cache.SharedInformer,
opts WatchOptions,
) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface
var cachedObject runtime.Object
store = informer.GetStore()
queue = workqueue.NewNamed(name)
if opts.IsUpdated == nil {
opts.IsUpdated = func(o, n interface{}) bool {
old, _ := accessor.ResourceVersion(o.(runtime.Object))
new, _ := accessor.ResourceVersion(n.(runtime.Object))
// Only enqueue changes that have a different resource versions to avoid processing resyncs.
return old != new
}
}
ctx, cancel := context.WithCancel(context.TODO())
w := &watcher{
client: client,
informer: informer,
store: store,
queue: queue,
ctx: ctx,
cachedObject: cachedObject,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
handler: NoOpEventHandlerFuncs{},
}
_, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
w.enqueue(o, add)
},
DeleteFunc: func(o interface{}) {
w.enqueue(o, delete)
},
UpdateFunc: func(o, n interface{}) {
if opts.IsUpdated(o, n) {
w.enqueue(n, update)
} else if opts.HonorReSyncs {
// HonorReSyncs ensure that at the time when the kubernetes client does a "resync", i.e, a full list of all
// objects we make sure that autodiscover processes them. Why is this necessary? An effective control loop works
// based on two state changes, a list and a watch. A watch is triggered each time the state of the system changes.
// However, there is no guarantee that all events from a watch are processed by the receiver. To ensure that missed events
// are properly handled, a period re-list is done to ensure that every state within the system is effectively handled.
// In this case, we are making sure that we are enqueueing an "add" event because, an runner that is already in Running
// state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update.
w.enqueue(n, add)
}
//We check the type of resource and only if it is namespace or node return the cacheObject
switch resource.(type) {
case *Namespace:
w.cacheObject(o)
case *Node:
w.cacheObject(o)
}
},
})
if err != nil {
return nil, err
}
return w, nil
}