internal/pkg/api/metrics.go (239 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package api import ( "context" "errors" "fmt" "sync" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" apmprometheus "go.elastic.co/apm/module/apmprometheus/v2" "go.elastic.co/apm/v2" "github.com/elastic/elastic-agent-libs/api" cfglib "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-system-metrics/report" "github.com/elastic/fleet-server/v7/internal/pkg/build" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/limit" "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/version" ) var ( registry *metricsRegistry cntHTTPNew *statsCounter cntHTTPClose *statsCounter cntHTTPActive *statsGauge cntCheckin routeStats cntEnroll routeStats cntAcks routeStats cntStatus routeStats cntUploadStart routeStats cntUploadChunk routeStats cntUploadEnd routeStats cntFileDeliv routeStats cntGetPGP routeStats cntAuditUnenroll routeStats cntArtifacts artifactStats infoReg sync.Once ) // init initializes all metrics that fleet-server collects // metrics must be explicitly exposed with a call to InitMetrics // FIXME we have global metrics but an internal and external API; this may lead to some confusion. func init() { err := report.SetupMetrics(logger.NewZapStub("instance-metrics"), build.ServiceName, version.DefaultVersion) if err != nil { zerolog.Ctx(context.TODO()).Error().Err(err).Msg("unable to initialize metrics") // TODO is used because this may logged during the package load } registry = newMetricsRegistry("http_server") cntHTTPNew = newCounter(registry, "tcp_open") cntHTTPClose = newCounter(registry, "tcp_close") cntHTTPActive = newGauge(registry, "tcp_active") routesRegistry := registry.newRegistry("routes") cntCheckin.Register(routesRegistry.newRegistry("checkin")) cntEnroll.Register(routesRegistry.newRegistry("enroll")) cntArtifacts.Register(routesRegistry.newRegistry("artifacts")) cntAcks.Register(routesRegistry.newRegistry("acks")) cntStatus.Register(routesRegistry.newRegistry("status")) cntUploadStart.Register(routesRegistry.newRegistry("uploadStart")) cntUploadChunk.Register(routesRegistry.newRegistry("uploadChunk")) cntUploadEnd.Register(routesRegistry.newRegistry("uploadEnd")) cntFileDeliv.Register(routesRegistry.newRegistry("deliverFile")) cntGetPGP.Register(routesRegistry.newRegistry("getPGPKey")) cntAuditUnenroll.Register(routesRegistry.newRegistry("auditUnenroll")) } // metricsRegistry wraps libbeat and prometheus registries type metricsRegistry struct { fullName string registry *monitoring.Registry promReg *prometheus.Registry } func newMetricsRegistry(name string) *metricsRegistry { reg := monitoring.Default return &metricsRegistry{ fullName: name, registry: reg.NewRegistry(name), promReg: prometheus.NewRegistry(), } } func (r *metricsRegistry) newRegistry(name string) *metricsRegistry { fullName := name if r.fullName != "" { fullName = r.fullName + "_" + name } return &metricsRegistry{ fullName: fullName, registry: r.registry.NewRegistry(name), promReg: r.promReg, } } // statsGauge wraps gauges for internal libbeat and prometheus type statsGauge struct { metric *monitoring.Uint gauge prometheus.Gauge } func newGauge(registry *metricsRegistry, name string) *statsGauge { g := prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: registry.fullName, Name: name, }) registry.promReg.MustRegister(g) return &statsGauge{ metric: monitoring.NewUint(registry.registry, name), gauge: g, } } func (g *statsGauge) Add(delta uint64) { g.metric.Add(delta) g.gauge.Add(float64(delta)) } func (g *statsGauge) Inc() { g.metric.Inc() g.gauge.Inc() } func (g *statsGauge) Dec() { g.metric.Dec() g.gauge.Dec() } // statsCounter wraps counters for internal libbeat and prometheus type statsCounter struct { metric *monitoring.Uint counter prometheus.Counter } func newCounter(registry *metricsRegistry, name string) *statsCounter { c := prometheus.NewCounter(prometheus.CounterOpts{ Namespace: registry.fullName, Name: name, }) registry.promReg.MustRegister(c) return &statsCounter{ metric: monitoring.NewUint(registry.registry, name), counter: c, } } func (g *statsCounter) Add(delta uint64) { g.metric.Add(delta) g.counter.Add(float64(delta)) } func (g *statsCounter) Inc() { g.metric.Inc() g.counter.Inc() } // routeStats is the generic collection metrics that we collect per API route. type routeStats struct { active *statsGauge total *statsCounter rateLimit *statsCounter maxLimit *statsCounter failure *statsCounter drop *statsCounter bodyIn *statsCounter bodyOut *statsCounter } func (rt *routeStats) Register(registry *metricsRegistry) { rt.active = newGauge(registry, "active") rt.total = newCounter(registry, "total") rt.rateLimit = newCounter(registry, "limit_rate") rt.maxLimit = newCounter(registry, "limit_max") rt.failure = newCounter(registry, "fail") rt.drop = newCounter(registry, "drop") rt.bodyIn = newCounter(registry, "body_in") rt.bodyOut = newCounter(registry, "body_out") } func (rt *routeStats) IncError(err error) { switch { case errors.Is(err, limit.ErrRateLimit): rt.rateLimit.Inc() case errors.Is(err, limit.ErrMaxLimit): rt.maxLimit.Inc() case errors.Is(err, context.Canceled): rt.drop.Inc() default: rt.failure.Inc() } } func (rt *routeStats) IncStart() func() { rt.total.Inc() rt.active.Inc() return rt.active.Dec } // artifactStats is the collection of metrics we collect for the artifact route. type artifactStats struct { routeStats notFound *statsCounter throttle *statsCounter } func (rt *artifactStats) Register(registry *metricsRegistry) { rt.routeStats.Register(registry) rt.notFound = newCounter(registry, "not_found") rt.throttle = newCounter(registry, "throttle") } func (rt *artifactStats) IncError(err error) { switch { case errors.Is(err, dl.ErrNotFound): rt.notFound.Inc() case errors.Is(err, ErrorThrottle): rt.throttle.Inc() default: rt.routeStats.IncError(err) } } // InitMetrics initializes metrics exposure mechanisms. // If tracer is not nil, prometheus metrics are shipped through the tracer. // If cfg.http.enabled is true a /stats endpoint is created to expose libbeat metrics and a /metrics endpoint is created to expose prometheus metrics on the specified interface. func InitMetrics(ctx context.Context, cfg *config.Config, bi build.Info, tracer *apm.Tracer) (*api.Server, error) { if tracer != nil { tracer.RegisterMetricsGatherer(apmprometheus.Wrap(registry.promReg)) } reg := monitoring.GetNamespace("info").GetRegistry() if reg.Get("version") == nil { monitoring.NewString(reg, "version").Set(bi.Version) } if reg.Get("name") == nil { monitoring.NewString(reg, "name").Set(build.ServiceName) } if !cfg.HTTP.Enabled { return nil, nil } // Start local api server; largely for metrics. zapStub := logger.NewZapStub("fleet-metrics") cfgStub, err := cfglib.NewConfigFrom(&cfg.HTTP) if err != nil { return nil, err } s, err := api.NewWithDefaultRoutes(zapStub, cfgStub, monitoring.GetNamespace) if err != nil { return nil, fmt.Errorf("could not start the HTTP server for the API: %w", err) } attachPrometheusEndpoint(s, registry.promReg, bi) s.Start() return s, err } type metricsRouter interface { AddRoute(string, api.HandlerFunc) } func attachPrometheusEndpoint(router metricsRouter, reg *prometheus.Registry, bi build.Info) { // do not attempt to re-register the metric on metrics restart. // NOTE we may want to move this block earlier in InitMetrics so the tracer can ship it? infoReg.Do(func() { prometheusInfo := prometheus.NewCounter(prometheus.CounterOpts{ Name: "service_info", Help: "Service information", ConstLabels: prometheus.Labels{ "version": bi.Version, "name": build.ServiceName, }, }) reg.MustRegister(prometheusInfo) prometheusInfo.Inc() }) h := promhttp.HandlerFor(reg, promhttp.HandlerOpts{}) router.AddRoute("/metrics", promhttp.InstrumentMetricHandler(reg, h).ServeHTTP) }