in otelcollector/otel-allocator/internal/watcher/promOperator.go [220:309]
func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors chan error) error {
success := true
// this channel needs to be buffered because notifications are asynchronous and neither producers nor consumers wait
notifyEvents := make(chan struct{}, 1)
if w.nsInformer != nil {
go w.nsInformer.Run(w.stopChannel)
if ok := w.WaitForNamedCacheSync("namespace", w.nsInformer.HasSynced); !ok {
success = false
}
_, _ = w.nsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
old := oldObj.(*v1.Namespace)
cur := newObj.(*v1.Namespace)
// Periodic resync may resend the Namespace without changes
// in-between.
if old.ResourceVersion == cur.ResourceVersion {
return
}
for name, selector := range map[string]*metav1.LabelSelector{
"PodMonitorNamespaceSelector": w.podMonitorNamespaceSelector,
"ServiceMonitorNamespaceSelector": w.serviceMonitorNamespaceSelector,
"ProbeNamespaceSelector": w.probeNamespaceSelector,
"ScrapeConfigNamespaceSelector": w.scrapeConfigNamespaceSelector,
} {
sync, err := k8sutil.LabelSelectionHasChanged(old.Labels, cur.Labels, selector)
if err != nil {
w.logger.Error("Failed to check label selection between namespaces while handling namespace updates", "selector", name, "error", err)
return
}
if sync {
select {
case notifyEvents <- struct{}{}:
default:
}
return
}
}
},
})
} else {
w.logger.Info("Unable to watch namespaces since namespace informer is nil")
}
for name, resource := range w.informers {
resource.Start(w.stopChannel)
if ok := w.WaitForNamedCacheSync(name, resource.HasSynced); !ok {
w.logger.Info("skipping informer", "informer", name)
continue
}
// only send an event notification if there isn't one already
resource.AddEventHandler(cache.ResourceEventHandlerFuncs{
// these functions only write to the notification channel if it's empty to avoid blocking
// if scrape config updates are being rate-limited
AddFunc: func(obj interface{}) {
select {
case notifyEvents <- struct{}{}:
default:
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
select {
case notifyEvents <- struct{}{}:
default:
}
},
DeleteFunc: func(obj interface{}) {
select {
case notifyEvents <- struct{}{}:
default:
}
},
})
}
if !success {
return fmt.Errorf("failed to sync one of the caches")
}
// limit the rate of outgoing events
w.rateLimitedEventSender(upstreamEvents, notifyEvents)
<-w.stopChannel
return nil
}