otelcollector/otel-allocator/internal/watcher/promOperator.go (366 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/blang/semver/v4"
"github.com/go-logr/logr"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
promv1alpha1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1alpha1"
"github.com/prometheus-operator/prometheus-operator/pkg/assets"
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
"github.com/prometheus-operator/prometheus-operator/pkg/informers"
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
"github.com/prometheus-operator/prometheus-operator/pkg/listwatch"
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
"github.com/prometheus-operator/prometheus-operator/pkg/prometheus"
prometheusgoclient "github.com/prometheus/client_golang/prometheus"
promconfig "github.com/prometheus/prometheus/config"
kubeDiscovery "github.com/prometheus/prometheus/discovery/kubernetes"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
allocatorconfig "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/internal/config"
)
const (
resyncPeriod = 5 * time.Minute
minEventInterval = time.Second * 5
)
func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocatorconfig.Config) (*PrometheusCRWatcher, error) {
promLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
slogger := slog.New(logr.ToSlogHandler(logger))
var resourceSelector *prometheus.ResourceSelector
mClient, err := monitoringclient.NewForConfig(cfg.ClusterConfig)
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(cfg.ClusterConfig)
if err != nil {
return nil, err
}
allowList, denyList := cfg.PrometheusCR.GetAllowDenyLists()
factory := informers.NewMonitoringInformerFactories(allowList, denyList, mClient, allocatorconfig.DefaultResyncTime, nil)
monitoringInformers, err := getInformers(factory)
if err != nil {
return nil, err
}
// we want to use endpointslices by default
serviceDiscoveryRole := monitoringv1.ServiceDiscoveryRole("EndpointSlice")
// TODO: We should make these durations configurable
prom := &monitoringv1.Prometheus{
ObjectMeta: metav1.ObjectMeta{
Namespace: cfg.CollectorNamespace,
},
Spec: monitoringv1.PrometheusSpec{
CommonPrometheusFields: monitoringv1.CommonPrometheusFields{
ScrapeInterval: monitoringv1.Duration(cfg.PrometheusCR.ScrapeInterval.String()),
PodMonitorSelector: cfg.PrometheusCR.PodMonitorSelector,
PodMonitorNamespaceSelector: cfg.PrometheusCR.PodMonitorNamespaceSelector,
ServiceMonitorSelector: cfg.PrometheusCR.ServiceMonitorSelector,
ServiceMonitorNamespaceSelector: cfg.PrometheusCR.ServiceMonitorNamespaceSelector,
ScrapeConfigSelector: cfg.PrometheusCR.ScrapeConfigSelector,
ScrapeConfigNamespaceSelector: cfg.PrometheusCR.ScrapeConfigNamespaceSelector,
ProbeSelector: cfg.PrometheusCR.ProbeSelector,
ProbeNamespaceSelector: cfg.PrometheusCR.ProbeNamespaceSelector,
ServiceDiscoveryRole: &serviceDiscoveryRole,
},
EvaluationInterval: monitoringv1.Duration("30s"),
},
}
generator, err := prometheus.NewConfigGenerator(promLogger, prom, prometheus.WithEndpointSliceSupport())
if err != nil {
return nil, err
}
store := assets.NewStoreBuilder(clientset.CoreV1(), clientset.CoreV1())
promRegisterer := prometheusgoclient.NewRegistry()
operatorMetrics := operator.NewMetrics(promRegisterer)
eventRecorderFactory := operator.NewEventRecorderFactory(false)
eventRecorder := eventRecorderFactory(clientset, "target-allocator")
var nsMonInf cache.SharedIndexInformer
getNamespaceInformerErr := retry.OnError(retry.DefaultRetry,
func(err error) bool {
logger.Error(err, "Retrying namespace informer creation in promOperator CRD watcher")
return true
}, func() error {
nsMonInf, err = getNamespaceInformer(ctx, allowList, denyList, promLogger, clientset, operatorMetrics)
return err
})
if getNamespaceInformerErr != nil {
logger.Error(getNamespaceInformerErr, "Failed to create namespace informer in promOperator CRD watcher")
return nil, getNamespaceInformerErr
}
resourceSelector, err = prometheus.NewResourceSelector(slogger, prom, store, nsMonInf, operatorMetrics, eventRecorder)
if err != nil {
logger.Error(err, "Failed to create resource selector in promOperator CRD watcher")
}
return &PrometheusCRWatcher{
logger: slogger,
kubeMonitoringClient: mClient,
k8sClient: clientset,
informers: monitoringInformers,
nsInformer: nsMonInf,
stopChannel: make(chan struct{}),
eventInterval: minEventInterval,
configGenerator: generator,
kubeConfigPath: cfg.KubeConfigFilePath,
podMonitorNamespaceSelector: cfg.PrometheusCR.PodMonitorNamespaceSelector,
serviceMonitorNamespaceSelector: cfg.PrometheusCR.ServiceMonitorNamespaceSelector,
scrapeConfigNamespaceSelector: cfg.PrometheusCR.ScrapeConfigNamespaceSelector,
probeNamespaceSelector: cfg.PrometheusCR.ProbeNamespaceSelector,
resourceSelector: resourceSelector,
store: store,
prometheusCR: prom,
}, nil
}
type PrometheusCRWatcher struct {
logger *slog.Logger
kubeMonitoringClient monitoringclient.Interface
k8sClient kubernetes.Interface
informers map[string]*informers.ForResource
nsInformer cache.SharedIndexInformer
eventInterval time.Duration
stopChannel chan struct{}
configGenerator *prometheus.ConfigGenerator
kubeConfigPath string
podMonitorNamespaceSelector *metav1.LabelSelector
serviceMonitorNamespaceSelector *metav1.LabelSelector
scrapeConfigNamespaceSelector *metav1.LabelSelector
probeNamespaceSelector *metav1.LabelSelector
resourceSelector *prometheus.ResourceSelector
store *assets.StoreBuilder
prometheusCR *monitoringv1.Prometheus
}
func getNamespaceInformer(ctx context.Context, allowList, denyList map[string]struct{}, promOperatorLogger *slog.Logger, clientset kubernetes.Interface, operatorMetrics *operator.Metrics) (cache.SharedIndexInformer, error) {
kubernetesVersion, err := clientset.Discovery().ServerVersion()
if err != nil {
return nil, err
}
kubernetesSemverVersion, err := semver.ParseTolerant(kubernetesVersion.String())
if err != nil {
return nil, err
}
lw, _, err := listwatch.NewNamespaceListWatchFromClient(
ctx,
promOperatorLogger,
kubernetesSemverVersion,
clientset.CoreV1(),
clientset.AuthorizationV1().SelfSubjectAccessReviews(),
allowList,
denyList,
)
if err != nil {
return nil, err
}
return cache.NewSharedIndexInformer(
operatorMetrics.NewInstrumentedListerWatcher(lw),
&v1.Namespace{}, resyncPeriod, cache.Indexers{},
), nil
}
// getInformers returns a map of informers for the given resources.
func getInformers(factory informers.FactoriesForNamespaces) (map[string]*informers.ForResource, error) {
serviceMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName))
if err != nil {
return nil, err
}
podMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName))
if err != nil {
return nil, err
}
probeInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ProbeName))
if err != nil {
return nil, err
}
scrapeConfigInformers, err := informers.NewInformersForResource(factory, promv1alpha1.SchemeGroupVersion.WithResource(promv1alpha1.ScrapeConfigName))
if err != nil {
return nil, err
}
return map[string]*informers.ForResource{
monitoringv1.ServiceMonitorName: serviceMonitorInformers,
monitoringv1.PodMonitorName: podMonitorInformers,
monitoringv1.ProbeName: probeInformers,
promv1alpha1.ScrapeConfigName: scrapeConfigInformers,
}, nil
}
// Watch wrapped informers and wait for an initial sync.
func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors chan error) error {
success := true
// this channel needs to be buffered because notifications are asynchronous and neither producers nor consumers wait
notifyEvents := make(chan struct{}, 1)
if w.nsInformer != nil {
go w.nsInformer.Run(w.stopChannel)
if ok := w.WaitForNamedCacheSync("namespace", w.nsInformer.HasSynced); !ok {
success = false
}
_, _ = w.nsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
old := oldObj.(*v1.Namespace)
cur := newObj.(*v1.Namespace)
// Periodic resync may resend the Namespace without changes
// in-between.
if old.ResourceVersion == cur.ResourceVersion {
return
}
for name, selector := range map[string]*metav1.LabelSelector{
"PodMonitorNamespaceSelector": w.podMonitorNamespaceSelector,
"ServiceMonitorNamespaceSelector": w.serviceMonitorNamespaceSelector,
"ProbeNamespaceSelector": w.probeNamespaceSelector,
"ScrapeConfigNamespaceSelector": w.scrapeConfigNamespaceSelector,
} {
sync, err := k8sutil.LabelSelectionHasChanged(old.Labels, cur.Labels, selector)
if err != nil {
w.logger.Error("Failed to check label selection between namespaces while handling namespace updates", "selector", name, "error", err)
return
}
if sync {
select {
case notifyEvents <- struct{}{}:
default:
}
return
}
}
},
})
} else {
w.logger.Info("Unable to watch namespaces since namespace informer is nil")
}
for name, resource := range w.informers {
resource.Start(w.stopChannel)
if ok := w.WaitForNamedCacheSync(name, resource.HasSynced); !ok {
w.logger.Info("skipping informer", "informer", name)
continue
}
// only send an event notification if there isn't one already
resource.AddEventHandler(cache.ResourceEventHandlerFuncs{
// these functions only write to the notification channel if it's empty to avoid blocking
// if scrape config updates are being rate-limited
AddFunc: func(obj interface{}) {
select {
case notifyEvents <- struct{}{}:
default:
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
select {
case notifyEvents <- struct{}{}:
default:
}
},
DeleteFunc: func(obj interface{}) {
select {
case notifyEvents <- struct{}{}:
default:
}
},
})
}
if !success {
return fmt.Errorf("failed to sync one of the caches")
}
// limit the rate of outgoing events
w.rateLimitedEventSender(upstreamEvents, notifyEvents)
<-w.stopChannel
return nil
}
// rateLimitedEventSender sends events to the upstreamEvents channel whenever it gets a notification on the notifyEvents channel,
// but not more frequently than once per w.eventPeriod.
func (w *PrometheusCRWatcher) rateLimitedEventSender(upstreamEvents chan Event, notifyEvents chan struct{}) {
ticker := time.NewTicker(w.eventInterval)
defer ticker.Stop()
event := Event{
Source: EventSourcePrometheusCR,
Watcher: Watcher(w),
}
for {
select {
case <-w.stopChannel:
return
case <-ticker.C: // throttle events to avoid excessive updates
select {
case <-notifyEvents:
select {
case upstreamEvents <- event:
default: // put the notification back in the queue if we can't send it upstream
select {
case notifyEvents <- struct{}{}:
default:
}
}
default:
}
}
}
}
func (w *PrometheusCRWatcher) Close() error {
close(w.stopChannel)
return nil
}
func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Config, error) {
promCfg := &promconfig.Config{}
if w.resourceSelector != nil {
serviceMonitorInstances, err := w.resourceSelector.SelectServiceMonitors(ctx, w.informers[monitoringv1.ServiceMonitorName].ListAllByNamespace)
if err != nil {
return nil, err
}
podMonitorInstances, err := w.resourceSelector.SelectPodMonitors(ctx, w.informers[monitoringv1.PodMonitorName].ListAllByNamespace)
if err != nil {
return nil, err
}
probeInstances, err := w.resourceSelector.SelectProbes(ctx, w.informers[monitoringv1.ProbeName].ListAllByNamespace)
if err != nil {
return nil, err
}
scrapeConfigInstances, err := w.resourceSelector.SelectScrapeConfigs(ctx, w.informers[promv1alpha1.ScrapeConfigName].ListAllByNamespace)
if err != nil {
return nil, err
}
generatedConfig, err := w.configGenerator.GenerateServerConfiguration(
w.prometheusCR,
serviceMonitorInstances,
podMonitorInstances,
probeInstances,
scrapeConfigInstances,
w.store,
nil,
nil,
nil,
[]string{})
if err != nil {
return nil, err
}
unmarshalErr := yaml.Unmarshal(generatedConfig, promCfg)
if unmarshalErr != nil {
return nil, unmarshalErr
}
// set kubeconfig path to service discovery configs, else kubernetes_sd will always attempt in-cluster
// authentication even if running with a detected kubeconfig
for _, scrapeConfig := range promCfg.ScrapeConfigs {
for _, serviceDiscoveryConfig := range scrapeConfig.ServiceDiscoveryConfigs {
if serviceDiscoveryConfig.Name() == "kubernetes" {
sdConfig := interface{}(serviceDiscoveryConfig).(*kubeDiscovery.SDConfig)
sdConfig.KubeConfig = w.kubeConfigPath
}
}
}
return promCfg, nil
} else {
w.logger.Info("Unable to load config since resource selector is nil, returning empty prometheus config")
return promCfg, nil
}
}
// WaitForNamedCacheSync adds a timeout to the informer's wait for the cache to be ready.
// If the PrometheusCRWatcher is unable to load an informer within 15 seconds, the method is
// cancelled and returns false. A successful informer load will return true. This method also
// will be cancelled if the target allocator's stopChannel is called before it returns.
//
// This method is inspired by the upstream prometheus-operator implementation, with a shorter timeout
// and support for the PrometheusCRWatcher's stopChannel.
// https://github.com/prometheus-operator/prometheus-operator/blob/293c16c854ce69d1da9fdc8f0705de2d67bfdbfa/pkg/operator/operator.go#L433
func (w *PrometheusCRWatcher) WaitForNamedCacheSync(controllerName string, inf cache.InformerSynced) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
t := time.NewTicker(time.Second * 5)
defer t.Stop()
go func() {
for {
select {
case <-t.C:
w.logger.Debug("cache sync not yet completed")
case <-ctx.Done():
return
case <-w.stopChannel:
w.logger.Warn("stop received, shutting down cache syncing")
cancel()
return
}
}
}()
ok := cache.WaitForNamedCacheSync(controllerName, ctx.Done(), inf)
if !ok {
w.logger.Error("failed to sync cache")
} else {
w.logger.Debug("successfully synced cache")
}
return ok
}