in otelcollector/otel-allocator/internal/collector/collector.go [56:98]
func (k *Watcher) Watch(
collectorNamespace string,
labelSelector *metav1.LabelSelector,
fn func(collectors map[string]*allocation.Collector),
) error {
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return err
}
listOptionsFunc := func(listOptions *metav1.ListOptions) {
listOptions.LabelSelector = selector.String()
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(
k.k8sClient,
time.Second*30,
informers.WithNamespace(collectorNamespace),
informers.WithTweakListOptions(listOptionsFunc))
informer := informerFactory.Core().V1().Pods().Informer()
notify := make(chan struct{}, 1)
go k.rateLimitedCollectorHandler(notify, informer.GetStore(), fn)
notifyFunc := func(_ interface{}) {
select {
case notify <- struct{}{}:
default:
}
}
_, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: notifyFunc,
UpdateFunc: func(oldObj, newObj interface{}) {
notifyFunc(newObj)
},
DeleteFunc: notifyFunc,
})
if err != nil {
return err
}
informer.Run(k.close)
return nil
}