collector/scraper.go (476 lines of code) (raw):

package collector import ( "context" "fmt" "math/rand" "regexp" "sort" "strconv" "strings" "sync" "time" "github.com/Azure/adx-mon/metrics" "github.com/Azure/adx-mon/pkg/k8s" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/adx-mon/pkg/prompb" "github.com/Azure/adx-mon/pkg/remote" "github.com/Azure/adx-mon/transform" v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" ) type ScraperOpts struct { NodeName string DefaultDropMetrics bool // AddLabels is a map of key/value pairs that will be added to all metrics. AddLabels map[string]string // DropLabels is a map of metric names regexes to label name regexes. When both match, the label will be dropped. DropLabels map[*regexp.Regexp]*regexp.Regexp // DropMetrics is a slice of regexes that drops metrics when the metric name matches. The metric name format // should match the Prometheus naming style before the metric is translated to a Kusto table name. DropMetrics []*regexp.Regexp KeepMetricsWithLabelValue map[*regexp.Regexp]*regexp.Regexp KeepMetrics []*regexp.Regexp // Database is the name of the database to write metrics to. Database string // ScrapeInterval is the interval at which to scrape metrics from the targets. ScrapeInterval time.Duration // ScrapeTimeout is the timeout for scraping metrics from a target. ScrapeTimeout time.Duration // DisableMetricsForwarding disables the forwarding of metrics to the remote write endpoint. DisableMetricsForwarding bool // DisableDiscovery disables the discovery of scrape targets. DisableDiscovery bool PodInformer *k8s.PodInformer // Targets is a list of static scrape targets. Targets []ScrapeTarget // MaxBatchSize is the maximum number of samples to send in a single batch. MaxBatchSize int RemoteClients []remote.RemoteWriteClient HealthChecker interface{ IsHealthy() bool } } func (s *ScraperOpts) RequestTransformer() *transform.RequestTransformer { return &transform.RequestTransformer{ DefaultDropMetrics: s.DefaultDropMetrics, AddLabels: s.AddLabels, DropLabels: s.DropLabels, DropMetrics: s.DropMetrics, KeepMetrics: s.KeepMetrics, KeepMetricsWithLabelValue: s.KeepMetricsWithLabelValue, } } type ScrapeTarget struct { Static bool Addr string Namespace string Pod string Container string } func (t ScrapeTarget) path() string { path := fmt.Sprintf("%s/%s", t.Namespace, t.Pod) if t.Container != "" { path = fmt.Sprintf("%s/%s", path, t.Container) } return path } func (t ScrapeTarget) String() string { return fmt.Sprintf("%s => %s/%s/%s", t.Addr, t.Namespace, t.Pod, t.Container) } func (t ScrapeTarget) Equals(other ScrapeTarget) bool { return t.Addr == other.Addr && t.Namespace == other.Namespace && t.Pod == other.Pod && t.Container == other.Container && t.Static == other.Static } type Scraper struct { opts ScraperOpts podInformer *k8s.PodInformer informerRegistration cache.ResourceEventHandlerRegistration requestTransformer *transform.RequestTransformer remoteClients []remote.RemoteWriteClient scrapeClient *MetricsClient seriesCreator *seriesCreator healthChecker interface{ IsHealthy() bool } wg sync.WaitGroup cancel context.CancelFunc mu sync.RWMutex targets map[string]ScrapeTarget } func NewScraper(opts *ScraperOpts) *Scraper { return &Scraper{ podInformer: opts.PodInformer, opts: *opts, seriesCreator: &seriesCreator{}, requestTransformer: opts.RequestTransformer(), remoteClients: opts.RemoteClients, healthChecker: opts.HealthChecker, targets: make(map[string]ScrapeTarget), } } func (s *Scraper) Open(ctx context.Context) error { logger.Infof("Starting prometheus scraper for node %s", s.opts.NodeName) ctx, cancelFn := context.WithCancel(ctx) s.cancel = cancelFn var err error s.scrapeClient, err = NewMetricsClient(ClientOpts{ ScrapeTimeOut: s.opts.ScrapeTimeout, NodeName: s.opts.NodeName, }) if err != nil { return fmt.Errorf("failed to create metrics client: %w", err) } // Add static targets for _, target := range s.opts.Targets { logger.Infof("Adding static target %s", target) s.targets[target.path()] = target } if !s.opts.DisableDiscovery { s.informerRegistration, err = s.podInformer.Add(ctx, s) if err != nil { return fmt.Errorf("failed to add pod informer: %w", err) } } // Discover the initial targets running on the node s.wg.Add(1) go s.scrape(ctx) go s.resync(ctx) return nil } func (s *Scraper) Close() error { s.scrapeClient.Close() s.cancel() if !s.opts.DisableDiscovery { s.podInformer.Remove(s.informerRegistration) } s.informerRegistration = nil s.wg.Wait() return nil } func (s *Scraper) scrape(ctx context.Context) { defer s.wg.Done() // Sleep for a random amount of time to avoid thundering herd select { case <-ctx.Done(): return case <-time.After(time.Duration(rand.Intn(int(s.opts.ScrapeInterval.Seconds()))) * time.Second): // continue } reconnectTimer := time.NewTicker(5 * time.Minute) defer reconnectTimer.Stop() t := time.NewTicker(s.opts.ScrapeInterval) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: if !s.healthChecker.IsHealthy() { logger.Warnf("Collector is unhealthy, skipping scrape") continue } s.scrapeTargets(ctx) case <-reconnectTimer.C: for _, remoteClient := range s.remoteClients { remoteClient.CloseIdleConnections() } } } } func (s *Scraper) scrapeTargets(ctx context.Context) { targets := s.Targets() scrapeTime := time.Now().UnixNano() / 1e6 wr := prompb.WriteRequestPool.Get() defer prompb.WriteRequestPool.Put(wr) for _, target := range targets { logger.Infof("Scraping %s", target.String()) iter, err := s.scrapeClient.FetchMetricsIterator(target.Addr) if err != nil { logger.Warnf("Failed to create scrape iterator %s/%s/%s at %s: %s", target.Namespace, target.Pod, target.Container, target.Addr, err.Error()) continue } for iter.Next() { pt := prompb.TimeSeriesPool.Get() ts, err := iter.TimeSeriesInto(pt) if err != nil { logger.Warnf("Failed to parse series %s/%s/%s at %s: %s", target.Namespace, target.Pod, target.Container, target.Addr, err.Error()) continue } name := prompb.MetricName(ts) if s.requestTransformer.ShouldDropMetric(ts, name) { prompb.TimeSeriesPool.Put(ts) metrics.MetricsDroppedTotal.WithLabelValues(string(name)).Add(1) continue } for i, s := range ts.Samples { if s.Timestamp == 0 { s.Timestamp = scrapeTime } ts.Samples[i] = s } if target.Namespace != "" { ts.AppendLabelString("adxmon_namespace", target.Namespace) } if target.Pod != "" { ts.AppendLabelString("adxmon_pod", target.Pod) } if target.Container != "" { ts.AppendLabelString("adxmon_container", target.Container) } prompb.Sort(ts.Labels) ts = s.requestTransformer.TransformTimeSeries(ts) wr.Timeseries = append(wr.Timeseries, ts) wr = s.flushBatchIfNecessary(ctx, wr) } if err := iter.Err(); err != nil { logger.Warnf("Failed to scrape %s/%s/%s at %s: %s", target.Namespace, target.Pod, target.Container, target.Addr, err.Error()) } if err := iter.Close(); err != nil { logger.Errorf("Failed to close iterator for %s/%s/%s: %s", target.Namespace, target.Pod, target.Container, err.Error()) } wr = s.flushBatchIfNecessary(ctx, wr) } if err := s.sendBatch(ctx, wr); err != nil { logger.Errorf(err.Error()) } wr.Timeseries = wr.Timeseries[:0] } func (s *Scraper) flushBatchIfNecessary(ctx context.Context, wr *prompb.WriteRequest) *prompb.WriteRequest { filtered := wr if len(filtered.Timeseries) >= s.opts.MaxBatchSize { filtered = s.requestTransformer.TransformWriteRequest(wr) } if len(filtered.Timeseries) >= s.opts.MaxBatchSize { if err := s.sendBatch(ctx, filtered); err != nil { logger.Errorf(err.Error()) } for i := range filtered.Timeseries { ts := filtered.Timeseries[i] prompb.TimeSeriesPool.Put(ts) } filtered.Timeseries = filtered.Timeseries[:0] } return filtered } func (s *Scraper) sendBatch(ctx context.Context, wr *prompb.WriteRequest) error { if len(wr.Timeseries) == 0 { return nil } if len(s.remoteClients) == 0 || logger.IsDebug() { var sb strings.Builder for _, ts := range wr.Timeseries { sb.Reset() for i, l := range ts.Labels { sb.Write(l.Name) sb.WriteString("=") sb.Write(l.Value) if i < len(ts.Labels)-1 { sb.Write([]byte(",")) } } sb.Write([]byte(" ")) for _, s := range ts.Samples { logger.Debugf("%s %d %f", sb.String(), s.Timestamp, s.Value) } } } start := time.Now() defer func() { logger.Infof("Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(s.remoteClients), time.Since(start)) }() return remote.WriteRequest(ctx, s.remoteClients, wr) } func (s *Scraper) OnAdd(obj interface{}, isInitialList bool) { p, ok := obj.(*v1.Pod) if !ok || p == nil { return } targets := s.isScrapeable(p) // Not a scrape-able pod if len(targets) == 0 { return } s.mu.Lock() defer s.mu.Unlock() for _, target := range targets { existing, ok := s.targets[string(p.UID)] if ok { if target.Equals(existing) { return } logger.Infof("Updating target %s %s", target.path(), target) } else { logger.Infof("Adding target %s %s", target.path(), target) } s.targets[string(p.UID)] = target } } func (s *Scraper) OnUpdate(oldObj, newObj interface{}) { p, ok := newObj.(*v1.Pod) if !ok || p == nil { return } if p.DeletionTimestamp != nil { s.OnDelete(p) return } s.OnAdd(p, false) } func (s *Scraper) OnDelete(obj interface{}) { p, ok := obj.(*v1.Pod) if !ok || p == nil { return } s.mu.Lock() defer s.mu.Unlock() target, ok := s.targets[string(p.UID)] if !ok { return } logger.Infof("Removing target %s %s", target.path(), target) delete(s.targets, string(p.UID)) } // isScrapeable returns the scrape target endpoints and true if the pod is currently a target, false otherwise func (s *Scraper) isScrapeable(p *v1.Pod) []ScrapeTarget { return makeTargets(p) } func (s *Scraper) Targets() []ScrapeTarget { s.mu.RLock() defer s.mu.RUnlock() a := make([]ScrapeTarget, 0, len(s.targets)) for _, v := range s.targets { a = append(a, v) } sort.Slice(a, func(i, j int) bool { return a[i].path() < a[j].path() }) return a } func (s *Scraper) resync(ctx context.Context) { t := time.NewTicker(30 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: pods, err := s.scrapeClient.Pods() if err != nil { logger.Errorf("Failed to list pods: %s", err.Error()) continue } podsOnNode := make(map[string]struct{}) s.mu.Lock() for _, p := range pods.Items { podsOnNode[string(p.UID)] = struct{}{} targets := s.isScrapeable(&p) if len(targets) == 0 { continue } for _, target := range targets { existing, ok := s.targets[string(p.UID)] if ok { if target.Equals(existing) { continue } logger.Infof("Updating target %s %s", target.path(), target) } else { logger.Infof("Adding target %s %s", target.path(), target) } s.targets[string(p.UID)] = target } } for k, target := range s.targets { if target.Static { continue } if _, ok := podsOnNode[k]; !ok { logger.Infof("Removing target %s", k) delete(s.targets, k) } } s.mu.Unlock() } } } func makeTargets(p *v1.Pod) []ScrapeTarget { var targets []ScrapeTarget // Skip the pod if it has not opted in to scraping if !strings.EqualFold(getAnnotationOrDefault(p, "adx-mon/scrape", "false"), "true") { return nil } podIP := p.Status.PodIP if podIP == "" { return nil } scheme := getAnnotationOrDefault(p, "adx-mon/scheme", "http") path := getAnnotationOrDefault(p, "adx-mon/path", "/metrics") // Just scrape this one port port := getAnnotationOrDefault(p, "adx-mon/port", "") // Scrape a comma separated list of targets with the format path:port like /metrics:8080 targetMap := getTargetAnnotationMapOrDefault(p, "adx-mon/targets", make(map[string]string)) for _, c := range p.Spec.Containers { for _, cp := range c.Ports { var readinessPort, livenessPort string if c.ReadinessProbe != nil && c.ReadinessProbe.HTTPGet != nil { readinessPort = c.ReadinessProbe.HTTPGet.Port.String() } if c.LivenessProbe != nil && c.LivenessProbe.HTTPGet != nil { livenessPort = c.LivenessProbe.HTTPGet.Port.String() } // If target list is specified, only scrape those path/port combinations if len(targetMap) != 0 { checkPorts := []string{strconv.Itoa(int(cp.ContainerPort)), readinessPort, livenessPort} for _, checkPort := range checkPorts { // if the current port, liveness port, or readiness port exist in the targetMap, add that to scrape targets if target, added := addTargetFromMap(podIP, scheme, checkPort, p.Namespace, p.Name, c.Name, targetMap); added { targets = append(targets, target) // if all targets are accounted for, return target list if len(targetMap) == 0 { return targets } } } // if there are remaining targets, continue iterating through containers continue } // If a port is specified, only scrape that port on the pod if port != "" { if port != strconv.Itoa(int(cp.ContainerPort)) && port != readinessPort && port != livenessPort { continue } targets = append(targets, ScrapeTarget{ Addr: fmt.Sprintf("%s://%s:%s%s", scheme, podIP, port, path), Namespace: p.Namespace, Pod: p.Name, Container: c.Name, }) return targets } targets = append(targets, ScrapeTarget{ Addr: fmt.Sprintf("%s://%s:%d%s", scheme, podIP, cp.ContainerPort, path), Namespace: p.Namespace, Pod: p.Name, Container: c.Name, }) } } return targets } func parseTargetList(targetList string) (map[string]string, error) { // Split the string by ',' rawTargets := strings.Split(targetList, ",") // Initialize the map m := make(map[string]string) // Iterate over the rawTargets for _, rawTarget := range rawTargets { // Split each rawTarget by ':' targetPair := strings.Split(strings.TrimSpace(rawTarget), ":") if len(targetPair) != 2 { return nil, fmt.Errorf("Using default scrape rules - target list contains malformed grouping: " + rawTarget) } // flipping expected order to ensure that port is the key m[targetPair[1]] = targetPair[0] } return m, nil } func addTargetFromMap(podIP, scheme, port, namespace, pod, container string, targetMap map[string]string) (ScrapeTarget, bool) { if tPath, ok := targetMap[port]; ok { target := ScrapeTarget{ Addr: fmt.Sprintf("%s://%s:%s%s", scheme, podIP, port, tPath), Namespace: namespace, Pod: pod, Container: container, } delete(targetMap, port) return target, true } return ScrapeTarget{}, false } func getAnnotationOrDefault(p *v1.Pod, key, def string) string { if value, ok := p.Annotations[key]; ok && value != "" { return value } return def } func getTargetAnnotationMapOrDefault(p *v1.Pod, key string, defaultVal map[string]string) map[string]string { rawVal, exists := p.Annotations[key] if !exists || rawVal == "" { return defaultVal } parsedMap, err := parseTargetList(rawVal) if err != nil { logger.Warnf(err.Error()) return defaultVal } return parsedMap }