cmd/amazon-cloudwatch-agent-target-allocator/watcher/promOperator.go (287 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"fmt"
"os"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/go-logr/logr"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
promv1alpha1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1alpha1"
"github.com/prometheus-operator/prometheus-operator/pkg/assets"
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
"github.com/prometheus-operator/prometheus-operator/pkg/informers"
"github.com/prometheus-operator/prometheus-operator/pkg/prometheus"
promconfig "github.com/prometheus/prometheus/config"
kubeDiscovery "github.com/prometheus/prometheus/discovery/kubernetes"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
allocatorconfig "github.com/aws/amazon-cloudwatch-agent-operator/cmd/amazon-cloudwatch-agent-target-allocator/config"
)
const minEventInterval = time.Second * 5
func NewPrometheusCRWatcher(logger logr.Logger, cfg allocatorconfig.Config) (*PrometheusCRWatcher, error) {
mClient, err := monitoringclient.NewForConfig(cfg.ClusterConfig)
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(cfg.ClusterConfig)
if err != nil {
return nil, err
}
factory := informers.NewMonitoringInformerFactories(map[string]struct{}{v1.NamespaceAll: {}}, map[string]struct{}{}, mClient, allocatorconfig.DefaultResyncTime, nil) //TODO decide what strategy to use regarding namespaces
monitoringInformers, err := getInformers(factory)
if err != nil {
return nil, err
}
// TODO: We should make these durations configurable
prom := &monitoringv1.Prometheus{
Spec: monitoringv1.PrometheusSpec{
CommonPrometheusFields: monitoringv1.CommonPrometheusFields{
ScrapeInterval: monitoringv1.Duration(cfg.PrometheusCR.ScrapeInterval.String()),
},
},
}
promOperatorLogger := level.NewFilter(log.NewLogfmtLogger(os.Stderr), level.AllowWarn())
generator, err := prometheus.NewConfigGenerator(promOperatorLogger, prom, true)
if err != nil {
return nil, err
}
servMonSelector := getSelector(cfg.ServiceMonitorSelector)
podMonSelector := getSelector(cfg.PodMonitorSelector)
return &PrometheusCRWatcher{
logger: logger,
kubeMonitoringClient: mClient,
k8sClient: clientset,
informers: monitoringInformers,
stopChannel: make(chan struct{}),
eventInterval: minEventInterval,
configGenerator: generator,
kubeConfigPath: cfg.KubeConfigFilePath,
serviceMonitorSelector: servMonSelector,
podMonitorSelector: podMonSelector,
}, nil
}
type PrometheusCRWatcher struct {
logger logr.Logger
kubeMonitoringClient monitoringclient.Interface
k8sClient kubernetes.Interface
informers map[string]*informers.ForResource
eventInterval time.Duration
stopChannel chan struct{}
configGenerator *prometheus.ConfigGenerator
kubeConfigPath string
serviceMonitorSelector labels.Selector
podMonitorSelector labels.Selector
}
func getSelector(s map[string]string) labels.Selector {
if s == nil {
return labels.NewSelector()
}
return labels.SelectorFromSet(s)
}
// getInformers returns a map of informers for the given resources.
func getInformers(factory informers.FactoriesForNamespaces) (map[string]*informers.ForResource, error) {
serviceMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName))
if err != nil {
return nil, err
}
podMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName))
if err != nil {
return nil, err
}
return map[string]*informers.ForResource{
monitoringv1.ServiceMonitorName: serviceMonitorInformers,
monitoringv1.PodMonitorName: podMonitorInformers,
}, nil
}
// Watch wrapped informers and wait for an initial sync.
func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors chan error) error {
success := true
// this channel needs to be buffered because notifications are asynchronous and neither producers nor consumers wait
notifyEvents := make(chan struct{}, 1)
for name, resource := range w.informers {
resource.Start(w.stopChannel)
if ok := cache.WaitForNamedCacheSync(name, w.stopChannel, resource.HasSynced); !ok {
success = false
}
// only send an event notification if there isn't one already
resource.AddEventHandler(cache.ResourceEventHandlerFuncs{
// these functions only write to the notification channel if it's empty to avoid blocking
// if scrape config updates are being rate-limited
AddFunc: func(obj interface{}) {
select {
case notifyEvents <- struct{}{}:
default:
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
select {
case notifyEvents <- struct{}{}:
default:
}
},
DeleteFunc: func(obj interface{}) {
select {
case notifyEvents <- struct{}{}:
default:
}
},
})
}
if !success {
return fmt.Errorf("failed to sync cache")
}
// limit the rate of outgoing events
w.rateLimitedEventSender(upstreamEvents, notifyEvents)
<-w.stopChannel
return nil
}
// rateLimitedEventSender sends events to the upstreamEvents channel whenever it gets a notification on the notifyEvents channel,
// but not more frequently than once per w.eventPeriod.
func (w *PrometheusCRWatcher) rateLimitedEventSender(upstreamEvents chan Event, notifyEvents chan struct{}) {
ticker := time.NewTicker(w.eventInterval)
defer ticker.Stop()
event := Event{
Source: EventSourcePrometheusCR,
Watcher: Watcher(w),
}
for {
select {
case <-w.stopChannel:
return
case <-ticker.C: // throttle events to avoid excessive updates
select {
case <-notifyEvents:
select {
case upstreamEvents <- event:
default: // put the notification back in the queue if we can't send it upstream
select {
case notifyEvents <- struct{}{}:
default:
}
}
default:
}
}
}
}
func (w *PrometheusCRWatcher) Close() error {
close(w.stopChannel)
return nil
}
func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Config, error) {
store := assets.NewStore(w.k8sClient.CoreV1(), w.k8sClient.CoreV1())
serviceMonitorInstances := make(map[string]*monitoringv1.ServiceMonitor)
smRetrieveErr := w.informers[monitoringv1.ServiceMonitorName].ListAll(w.serviceMonitorSelector, func(sm interface{}) {
monitor := sm.(*monitoringv1.ServiceMonitor)
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor)
w.addStoreAssetsForServiceMonitor(ctx, monitor.Name, monitor.Namespace, monitor.Spec.Endpoints, store)
serviceMonitorInstances[key] = monitor
})
if smRetrieveErr != nil {
return nil, smRetrieveErr
}
podMonitorInstances := make(map[string]*monitoringv1.PodMonitor)
pmRetrieveErr := w.informers[monitoringv1.PodMonitorName].ListAll(w.podMonitorSelector, func(pm interface{}) {
monitor := pm.(*monitoringv1.PodMonitor)
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor)
w.addStoreAssetsForPodMonitor(ctx, monitor.Name, monitor.Namespace, monitor.Spec.PodMetricsEndpoints, store)
podMonitorInstances[key] = monitor
})
if pmRetrieveErr != nil {
return nil, pmRetrieveErr
}
generatedConfig, err := w.configGenerator.GenerateServerConfiguration(
ctx,
"30s",
"",
nil,
nil,
monitoringv1.TSDBSpec{},
nil,
nil,
serviceMonitorInstances,
podMonitorInstances,
map[string]*monitoringv1.Probe{},
map[string]*promv1alpha1.ScrapeConfig{},
store,
nil,
nil,
nil,
[]string{})
if err != nil {
return nil, err
}
promCfg := &promconfig.Config{}
unmarshalErr := yaml.Unmarshal(generatedConfig, promCfg)
if unmarshalErr != nil {
return nil, unmarshalErr
}
// set kubeconfig path to service discovery configs, else kubernetes_sd will always attempt in-cluster
// authentication even if running with a detected kubeconfig
for _, scrapeConfig := range promCfg.ScrapeConfigs {
for _, serviceDiscoveryConfig := range scrapeConfig.ServiceDiscoveryConfigs {
if serviceDiscoveryConfig.Name() == "kubernetes" {
sdConfig := interface{}(serviceDiscoveryConfig).(*kubeDiscovery.SDConfig)
sdConfig.KubeConfig = w.kubeConfigPath
}
}
}
return promCfg, nil
}
// addStoreAssetsForServiceMonitor adds authentication / authorization related information to the assets store,
// based on the service monitor and endpoints specs.
// This code borrows from
// https://github.com/prometheus-operator/prometheus-operator/blob/06b5c4189f3f72737766d86103d049115c3aff48/pkg/prometheus/resource_selector.go#L73.
func (w *PrometheusCRWatcher) addStoreAssetsForServiceMonitor(
ctx context.Context,
smName, smNamespace string,
endps []monitoringv1.Endpoint,
store *assets.Store,
) {
var err error
for i, endp := range endps {
objKey := fmt.Sprintf("serviceMonitor/%s/%s/%d", smNamespace, smName, i)
if err = store.AddSafeAuthorizationCredentials(ctx, smNamespace, endp.Authorization, objKey); err != nil {
break
}
if err = store.AddBasicAuth(ctx, smNamespace, endp.BasicAuth, objKey); err != nil {
break
}
if endp.TLSConfig != nil {
if err = store.AddTLSConfig(ctx, smNamespace, endp.TLSConfig); err != nil {
break
}
}
if err = store.AddOAuth2(ctx, smNamespace, endp.OAuth2, objKey); err != nil {
break
}
smAuthKey := fmt.Sprintf("serviceMonitor/auth/%s/%s/%d", smNamespace, smName, i)
if err = store.AddSafeAuthorizationCredentials(ctx, smNamespace, endp.Authorization, smAuthKey); err != nil {
break
}
}
if err != nil {
w.logger.Error(err, "Failed to obtain credentials for a ServiceMonitor", "serviceMonitor", smName)
}
}
// addStoreAssetsForServiceMonitor adds authentication / authorization related information to the assets store,
// based on the service monitor and pod metrics endpoints specs.
// This code borrows from
// https://github.com/prometheus-operator/prometheus-operator/blob/06b5c4189f3f72737766d86103d049115c3aff48/pkg/prometheus/resource_selector.go#L314.
func (w *PrometheusCRWatcher) addStoreAssetsForPodMonitor(
ctx context.Context,
pmName, pmNamespace string,
podMetricsEndps []monitoringv1.PodMetricsEndpoint,
store *assets.Store,
) {
var err error
for i, endp := range podMetricsEndps {
objKey := fmt.Sprintf("podMonitor/%s/%s/%d", pmNamespace, pmName, i)
if err = store.AddSafeAuthorizationCredentials(ctx, pmNamespace, endp.Authorization, objKey); err != nil {
break
}
if err = store.AddBasicAuth(ctx, pmNamespace, endp.BasicAuth, objKey); err != nil {
break
}
if endp.TLSConfig != nil {
if err = store.AddSafeTLSConfig(ctx, pmNamespace, &endp.TLSConfig.SafeTLSConfig); err != nil {
break
}
}
if err = store.AddOAuth2(ctx, pmNamespace, endp.OAuth2, objKey); err != nil {
break
}
smAuthKey := fmt.Sprintf("podMonitor/auth/%s/%s/%d", pmNamespace, pmName, i)
if err = store.AddSafeAuthorizationCredentials(ctx, pmNamespace, endp.Authorization, smAuthKey); err != nil {
break
}
}
if err != nil {
w.logger.Error(err, "Failed to obtain credentials for a PodMonitor", "podMonitor", pmName)
}
}