in otelcollector/otel-allocator/internal/watcher/promOperator.go [42:139]
func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocatorconfig.Config) (*PrometheusCRWatcher, error) {
promLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
slogger := slog.New(logr.ToSlogHandler(logger))
var resourceSelector *prometheus.ResourceSelector
mClient, err := monitoringclient.NewForConfig(cfg.ClusterConfig)
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(cfg.ClusterConfig)
if err != nil {
return nil, err
}
allowList, denyList := cfg.PrometheusCR.GetAllowDenyLists()
factory := informers.NewMonitoringInformerFactories(allowList, denyList, mClient, allocatorconfig.DefaultResyncTime, nil)
monitoringInformers, err := getInformers(factory)
if err != nil {
return nil, err
}
// we want to use endpointslices by default
serviceDiscoveryRole := monitoringv1.ServiceDiscoveryRole("EndpointSlice")
// TODO: We should make these durations configurable
prom := &monitoringv1.Prometheus{
ObjectMeta: metav1.ObjectMeta{
Namespace: cfg.CollectorNamespace,
},
Spec: monitoringv1.PrometheusSpec{
CommonPrometheusFields: monitoringv1.CommonPrometheusFields{
ScrapeInterval: monitoringv1.Duration(cfg.PrometheusCR.ScrapeInterval.String()),
PodMonitorSelector: cfg.PrometheusCR.PodMonitorSelector,
PodMonitorNamespaceSelector: cfg.PrometheusCR.PodMonitorNamespaceSelector,
ServiceMonitorSelector: cfg.PrometheusCR.ServiceMonitorSelector,
ServiceMonitorNamespaceSelector: cfg.PrometheusCR.ServiceMonitorNamespaceSelector,
ScrapeConfigSelector: cfg.PrometheusCR.ScrapeConfigSelector,
ScrapeConfigNamespaceSelector: cfg.PrometheusCR.ScrapeConfigNamespaceSelector,
ProbeSelector: cfg.PrometheusCR.ProbeSelector,
ProbeNamespaceSelector: cfg.PrometheusCR.ProbeNamespaceSelector,
ServiceDiscoveryRole: &serviceDiscoveryRole,
},
EvaluationInterval: monitoringv1.Duration("30s"),
},
}
generator, err := prometheus.NewConfigGenerator(promLogger, prom, prometheus.WithEndpointSliceSupport())
if err != nil {
return nil, err
}
store := assets.NewStoreBuilder(clientset.CoreV1(), clientset.CoreV1())
promRegisterer := prometheusgoclient.NewRegistry()
operatorMetrics := operator.NewMetrics(promRegisterer)
eventRecorderFactory := operator.NewEventRecorderFactory(false)
eventRecorder := eventRecorderFactory(clientset, "target-allocator")
var nsMonInf cache.SharedIndexInformer
getNamespaceInformerErr := retry.OnError(retry.DefaultRetry,
func(err error) bool {
logger.Error(err, "Retrying namespace informer creation in promOperator CRD watcher")
return true
}, func() error {
nsMonInf, err = getNamespaceInformer(ctx, allowList, denyList, promLogger, clientset, operatorMetrics)
return err
})
if getNamespaceInformerErr != nil {
logger.Error(getNamespaceInformerErr, "Failed to create namespace informer in promOperator CRD watcher")
return nil, getNamespaceInformerErr
}
resourceSelector, err = prometheus.NewResourceSelector(slogger, prom, store, nsMonInf, operatorMetrics, eventRecorder)
if err != nil {
logger.Error(err, "Failed to create resource selector in promOperator CRD watcher")
}
return &PrometheusCRWatcher{
logger: slogger,
kubeMonitoringClient: mClient,
k8sClient: clientset,
informers: monitoringInformers,
nsInformer: nsMonInf,
stopChannel: make(chan struct{}),
eventInterval: minEventInterval,
configGenerator: generator,
kubeConfigPath: cfg.KubeConfigFilePath,
podMonitorNamespaceSelector: cfg.PrometheusCR.PodMonitorNamespaceSelector,
serviceMonitorNamespaceSelector: cfg.PrometheusCR.ServiceMonitorNamespaceSelector,
scrapeConfigNamespaceSelector: cfg.PrometheusCR.ScrapeConfigNamespaceSelector,
probeNamespaceSelector: cfg.PrometheusCR.ProbeNamespaceSelector,
resourceSelector: resourceSelector,
store: store,
prometheusCR: prom,
}, nil
}