otelcollector/otel-allocator/internal/collector/collector.go (135 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package collector import ( "time" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/internal/allocation" ) const ( defaultMinUpdateInterval = time.Second * 5 ) var ( collectorsDiscovered = promauto.NewGauge(prometheus.GaugeOpts{ Name: "opentelemetry_allocator_collectors_discovered", Help: "Number of collectors discovered.", }) ) type Watcher struct { log logr.Logger k8sClient kubernetes.Interface close chan struct{} minUpdateInterval time.Duration collectorNotReadyGracePeriod time.Duration } func NewCollectorWatcher(logger logr.Logger, kubeConfig *rest.Config, collectorNotReadyGracePeriod time.Duration) (*Watcher, error) { clientset, err := kubernetes.NewForConfig(kubeConfig) if err != nil { return &Watcher{}, err } return &Watcher{ log: logger.WithValues("component", "opentelemetry-targetallocator"), k8sClient: clientset, close: make(chan struct{}), minUpdateInterval: defaultMinUpdateInterval, collectorNotReadyGracePeriod: collectorNotReadyGracePeriod, }, nil } func (k *Watcher) Watch( collectorNamespace string, labelSelector *metav1.LabelSelector, fn func(collectors map[string]*allocation.Collector), ) error { selector, err := metav1.LabelSelectorAsSelector(labelSelector) if err != nil { return err } listOptionsFunc := func(listOptions *metav1.ListOptions) { listOptions.LabelSelector = selector.String() } informerFactory := informers.NewSharedInformerFactoryWithOptions( k.k8sClient, time.Second*30, informers.WithNamespace(collectorNamespace), informers.WithTweakListOptions(listOptionsFunc)) informer := informerFactory.Core().V1().Pods().Informer() notify := make(chan struct{}, 1) go k.rateLimitedCollectorHandler(notify, informer.GetStore(), fn) notifyFunc := func(_ interface{}) { select { case notify <- struct{}{}: default: } } _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: notifyFunc, UpdateFunc: func(oldObj, newObj interface{}) { notifyFunc(newObj) }, DeleteFunc: notifyFunc, }) if err != nil { return err } informer.Run(k.close) return nil } // rateLimitedCollectorHandler runs fn on collectors present in the store whenever it gets a notification on the notify channel, // but not more frequently than once per k.eventPeriod. func (k *Watcher) rateLimitedCollectorHandler(notify chan struct{}, store cache.Store, fn func(collectors map[string]*allocation.Collector)) { ticker := time.NewTicker(k.minUpdateInterval) defer ticker.Stop() for { select { case <-k.close: return case <-ticker.C: // throttle events to avoid excessive updates select { case <-notify: k.runOnCollectors(store, fn) default: } } } } // runOnCollectors runs the provided function on the set of collectors from the Store. func (k *Watcher) runOnCollectors(store cache.Store, fn func(collectors map[string]*allocation.Collector)) { objects := store.List() collectorMap := make(map[string]*allocation.Collector, len(objects)) for _, obj := range objects { pod := obj.(*v1.Pod) if pod.Spec.NodeName == "" { continue } // pod healthiness check will always be disabled if CollectorNotReadyGracePeriod is set to 0 * time.Second if k.isPodUnhealthy(pod, k.collectorNotReadyGracePeriod) { continue } collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName) } collectorsDiscovered.Set(float64(len(collectorMap))) fn(collectorMap) } func (k *Watcher) Close() { close(k.close) } func (k *Watcher) isPodUnhealthy(pod *v1.Pod, collectorNotReadyGracePeriod time.Duration) bool { if collectorNotReadyGracePeriod == 0*time.Second { return false } isPodUnhealthy := false timeNow := time.Now() // stop assigning targets to a non-Running pod that has lasted for a specific period if pod.Status.Phase != v1.PodRunning && pod.Status.StartTime != nil && (timeNow.Sub(pod.Status.StartTime.Time) > collectorNotReadyGracePeriod) { isPodUnhealthy = true } // stop assigning targets to a non-Ready pod that has lasted for a specific period for _, condition := range pod.Status.Conditions { if condition.Type == v1.PodReady && condition.Status != v1.ConditionTrue && (timeNow.Sub(condition.LastTransitionTime.Time) > collectorNotReadyGracePeriod) { isPodUnhealthy = true } } return isPodUnhealthy }