cmd/amazon-cloudwatch-agent-target-allocator/collector/collector.go (111 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package collector import ( "context" "os" "time" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "github.com/aws/amazon-cloudwatch-agent-operator/cmd/amazon-cloudwatch-agent-target-allocator/allocation" ) const ( watcherTimeout = 15 * time.Minute ) var ( ns = os.Getenv("OTELCOL_NAMESPACE") collectorsDiscovered = promauto.NewGauge(prometheus.GaugeOpts{ Name: "amazon_cloudwatch_agent_allocator_collectors_discovered", Help: "Number of collectors discovered.", }) ) type Client struct { log logr.Logger k8sClient kubernetes.Interface close chan struct{} } func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) { clientset, err := kubernetes.NewForConfig(kubeConfig) if err != nil { return &Client{}, err } return &Client{ log: logger.WithValues("component", "amazon-cloudwatch-agent-target-allocator"), k8sClient: clientset, close: make(chan struct{}), }, nil } func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(collectors map[string]*allocation.Collector)) error { collectorMap := map[string]*allocation.Collector{} opts := metav1.ListOptions{ LabelSelector: labels.SelectorFromSet(labelMap).String(), } pods, err := k.k8sClient.CoreV1().Pods(ns).List(ctx, opts) if err != nil { k.log.Error(err, "Pod failure") os.Exit(1) } for i := range pods.Items { pod := pods.Items[i] if pod.GetObjectMeta().GetDeletionTimestamp() == nil { collectorMap[pod.Name] = allocation.NewCollector(pod.Name) } } fn(collectorMap) for { if !k.restartWatch(ctx, opts, collectorMap, fn) { return nil } } } func (k *Client) restartWatch(ctx context.Context, opts metav1.ListOptions, collectorMap map[string]*allocation.Collector, fn func(collectors map[string]*allocation.Collector)) bool { // add timeout to the context before calling Watch ctx, cancel := context.WithTimeout(ctx, watcherTimeout) defer cancel() watcher, err := k.k8sClient.CoreV1().Pods(ns).Watch(ctx, opts) if err != nil { k.log.Error(err, "unable to create collector pod watcher") return false } k.log.Info("Successfully started a collector pod watcher") if msg := runWatch(ctx, k, watcher.ResultChan(), collectorMap, fn); msg != "" { k.log.Info("Collector pod watch event stopped " + msg) return false } return true } func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]*allocation.Collector, fn func(collectors map[string]*allocation.Collector)) string { for { collectorsDiscovered.Set(float64(len(collectorMap))) select { case <-k.close: return "kubernetes client closed" case <-ctx.Done(): return "" // this means that the watcher most likely timed out case event, ok := <-c: if !ok { k.log.Info("No event found. Restarting watch routine") return "" } pod, ok := event.Object.(*v1.Pod) if !ok { k.log.Info("No pod found in event Object. Restarting watch routine") return "" } switch event.Type { //nolint:exhaustive case watch.Added: collectorMap[pod.Name] = allocation.NewCollector(pod.Name) case watch.Deleted: delete(collectorMap, pod.Name) } fn(collectorMap) } } } func (k *Client) Close() { close(k.close) }