receiver/prometheusreceiver/metrics_receiver.go (363 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" import ( "context" "errors" "fmt" "log/slog" "net/http" "net/url" "os" "reflect" "regexp" "runtime" "runtime/debug" "strings" "sync" "time" "unsafe" grafanaRegexp "github.com/grafana/regexp" "github.com/mwitkow/go-conntrack" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/promslog" "github.com/prometheus/common/route" "github.com/prometheus/common/version" toolkit_web "github.com/prometheus/exporter-toolkit/web" promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/web" api_v1 "github.com/prometheus/prometheus/web/api/v1" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.uber.org/zap" "go.uber.org/zap/exp/zapslog" "golang.org/x/net/netutil" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" ) const ( defaultGCInterval = 2 * time.Minute gcIntervalDelta = 1 * time.Minute // Use same settings as Prometheus web server maxConnections = 512 readTimeoutMinutes = 10 ) // pReceiver is the type that provides Prometheus scraper/receiver functionality. type pReceiver struct { cfg *Config consumer consumer.Metrics cancelFunc context.CancelFunc configLoaded chan struct{} loadConfigOnce sync.Once settings receiver.Settings scrapeManager *scrape.Manager discoveryManager *discovery.Manager targetAllocatorManager *targetallocator.Manager apiServer *http.Server registry *prometheus.Registry registerer prometheus.Registerer unregisterMetrics func() skipOffsetting bool // for testing only } // New creates a new prometheus.Receiver reference. func newPrometheusReceiver(set receiver.Settings, cfg *Config, next consumer.Metrics) *pReceiver { baseCfg := promconfig.Config(*cfg.PrometheusConfig) registry := prometheus.NewRegistry() registerer := prometheus.WrapRegistererWith( prometheus.Labels{"receiver": set.ID.String()}, registry) pr := &pReceiver{ cfg: cfg, consumer: next, settings: set, configLoaded: make(chan struct{}), registerer: registerer, registry: registry, targetAllocatorManager: targetallocator.NewManager( set, cfg.TargetAllocator, &baseCfg, enableNativeHistogramsGate.IsEnabled(), ), } return pr } // Start is the method that starts Prometheus scraping. It // is controlled by having previously defined a Configuration using perhaps New. func (r *pReceiver) Start(ctx context.Context, host component.Host) error { discoveryCtx, cancel := context.WithCancel(context.Background()) r.cancelFunc = cancel logger := slog.New(zapslog.NewHandler(r.settings.Logger.Core())) err := r.initPrometheusComponents(discoveryCtx, logger, host) if err != nil { r.settings.Logger.Error("Failed to initPrometheusComponents Prometheus components", zap.Error(err)) return err } err = r.targetAllocatorManager.Start(ctx, host, r.scrapeManager, r.discoveryManager) if err != nil { return err } if r.cfg.APIServer != nil && r.cfg.APIServer.Enabled { err = r.initAPIServer(discoveryCtx, host) if err != nil { r.settings.Logger.Error("Failed to initAPIServer", zap.Error(err)) } } r.loadConfigOnce.Do(func() { close(r.configLoaded) }) return nil } func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger *slog.Logger, host component.Host) error { // Some SD mechanisms use the "refresh" package, which has its own metrics. refreshSdMetrics := discovery.NewRefreshMetrics(r.registerer) // Register the metrics specific for each SD mechanism, and the ones for the refresh package. sdMetrics, err := discovery.RegisterSDMetrics(r.registerer, refreshSdMetrics) if err != nil { return fmt.Errorf("failed to register service discovery metrics: %w", err) } r.discoveryManager = discovery.NewManager(ctx, logger, r.registerer, sdMetrics) if r.discoveryManager == nil { // NewManager can sometimes return nil if it encountered an error, but // the error message is logged separately. return errors.New("failed to create discovery manager") } go func() { r.settings.Logger.Info("Starting discovery manager") if err = r.discoveryManager.Run(); err != nil && !errors.Is(err, context.Canceled) { r.settings.Logger.Error("Discovery manager failed", zap.Error(err)) componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } }() var startTimeMetricRegex *regexp.Regexp if r.cfg.StartTimeMetricRegex != "" { startTimeMetricRegex, err = regexp.Compile(r.cfg.StartTimeMetricRegex) if err != nil { return err } } store, err := internal.NewAppendable( r.consumer, r.settings, gcInterval(r.cfg.PrometheusConfig), r.cfg.UseStartTimeMetric, startTimeMetricRegex, useCreatedMetricGate.IsEnabled(), enableNativeHistogramsGate.IsEnabled(), r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels, r.cfg.TrimMetricSuffixes, ) if err != nil { return err } opts := &scrape.Options{ PassMetadataInContext: true, ExtraMetrics: r.cfg.ReportExtraScrapeMetrics, HTTPClientOptions: []commonconfig.HTTPClientOption{ commonconfig.WithUserAgent(r.settings.BuildInfo.Command + "/" + r.settings.BuildInfo.Version), }, EnableCreatedTimestampZeroIngestion: true, } if enableNativeHistogramsGate.IsEnabled() { opts.EnableNativeHistogramsIngestion = true } // for testing only if r.skipOffsetting { optsValue := reflect.ValueOf(opts).Elem() field := optsValue.FieldByName("skipOffsetting") reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())). Elem(). Set(reflect.ValueOf(true)) } scrapeManager, err := scrape.NewManager(opts, logger, nil, store, r.registerer) if err != nil { return err } r.scrapeManager = scrapeManager r.unregisterMetrics = func() { refreshSdMetrics.Unregister() for _, sdMetric := range sdMetrics { sdMetric.Unregister() } r.discoveryManager.UnregisterMetrics() r.scrapeManager.UnregisterMetrics() } go func() { // The scrape manager needs to wait for the configuration to be loaded before beginning <-r.configLoaded r.settings.Logger.Info("Starting scrape manager") if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil { r.settings.Logger.Error("Scrape manager failed", zap.Error(err)) componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } }() return nil } func (r *pReceiver) initAPIServer(ctx context.Context, host component.Host) error { r.settings.Logger.Info("Starting Prometheus API server") // If allowed CORS origins are provided in the receiver config, combine them into a single regex since the Prometheus API server requires this format. var corsOriginRegexp *grafanaRegexp.Regexp if r.cfg.APIServer.ServerConfig.CORS != nil && len(r.cfg.APIServer.ServerConfig.CORS.AllowedOrigins) > 0 { var combinedOriginsBuilder strings.Builder combinedOriginsBuilder.WriteString(r.cfg.APIServer.ServerConfig.CORS.AllowedOrigins[0]) for _, origin := range r.cfg.APIServer.ServerConfig.CORS.AllowedOrigins[1:] { combinedOriginsBuilder.WriteString("|") combinedOriginsBuilder.WriteString(origin) } combinedRegexp, err := grafanaRegexp.Compile(combinedOriginsBuilder.String()) if err != nil { return fmt.Errorf("failed to compile combined CORS allowed origins into regex: %s", err.Error()) } corsOriginRegexp = combinedRegexp } // If read timeout is not set in the receiver config, use the default Prometheus value. readTimeout := r.cfg.APIServer.ServerConfig.ReadTimeout if readTimeout == 0 { readTimeout = time.Duration(readTimeoutMinutes) * time.Minute } o := &web.Options{ ScrapeManager: r.scrapeManager, Context: ctx, ListenAddresses: []string{r.cfg.APIServer.ServerConfig.Endpoint}, ExternalURL: &url.URL{ Scheme: "http", Host: r.cfg.APIServer.ServerConfig.Endpoint, Path: "", }, RoutePrefix: "/", ReadTimeout: readTimeout, PageTitle: "Prometheus Receiver", Flags: make(map[string]string), MaxConnections: maxConnections, IsAgent: true, Registerer: r.registerer, Gatherer: r.registry, CORSOrigin: corsOriginRegexp, } // Creates the API object in the same way as the Prometheus web package: https://github.com/prometheus/prometheus/blob/6150e1ca0ede508e56414363cc9062ef522db518/web/web.go#L314-L354 // Anything not defined by the options above will be nil, such as o.QueryEngine, o.Storage, etc. IsAgent=true, so these being nil is expected by Prometheus. factorySPr := func(_ context.Context) api_v1.ScrapePoolsRetriever { return o.ScrapeManager } factoryTr := func(_ context.Context) api_v1.TargetRetriever { return o.ScrapeManager } factoryAr := func(_ context.Context) api_v1.AlertmanagerRetriever { return nil } factoryRr := func(_ context.Context) api_v1.RulesRetriever { return nil } var app storage.Appendable logger := promslog.NewNopLogger() apiV1 := api_v1.NewAPI(o.QueryEngine, o.Storage, app, o.ExemplarStorage, factorySPr, factoryTr, factoryAr, // This ensures that any changes to the config made, even by the target allocator, are reflected in the API. func() promconfig.Config { return *(*promconfig.Config)(r.cfg.PrometheusConfig) }, o.Flags, // nil api_v1.GlobalURLOptions{ ListenAddress: o.ListenAddresses[0], Host: o.ExternalURL.Host, Scheme: o.ExternalURL.Scheme, }, func(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { f(w, r) } }, o.LocalStorage, // nil o.TSDBDir, // nil o.EnableAdminAPI, // nil logger, factoryRr, o.RemoteReadSampleLimit, // nil o.RemoteReadConcurrencyLimit, // nil o.RemoteReadBytesInFrame, // nil o.IsAgent, o.CORSOrigin, func() (api_v1.RuntimeInfo, error) { status := api_v1.RuntimeInfo{ GoroutineCount: runtime.NumGoroutine(), GOMAXPROCS: runtime.GOMAXPROCS(0), GOMEMLIMIT: debug.SetMemoryLimit(-1), GOGC: os.Getenv("GOGC"), GODEBUG: os.Getenv("GODEBUG"), } return status, nil }, &web.PrometheusVersion{ Version: version.Version, Revision: version.Revision, Branch: version.Branch, BuildUser: version.BuildUser, BuildDate: version.BuildDate, GoVersion: version.GoVersion, }, o.NotificationsGetter, o.NotificationsSub, o.Gatherer, o.Registerer, nil, o.EnableRemoteWriteReceiver, o.AcceptRemoteWriteProtoMsgs, o.EnableOTLPWriteReceiver, ) // Create listener and monitor with conntrack in the same way as the Prometheus web package: https://github.com/prometheus/prometheus/blob/6150e1ca0ede508e56414363cc9062ef522db518/web/web.go#L564-L579 listener, err := r.cfg.APIServer.ServerConfig.ToListener(ctx) if err != nil { return fmt.Errorf("failed to create listener: %s", err.Error()) } listener = netutil.LimitListener(listener, o.MaxConnections) listener = conntrack.NewListener(listener, conntrack.TrackWithName("http"), conntrack.TrackWithTracing()) // Run the API server in the same way as the Prometheus web package: https://github.com/prometheus/prometheus/blob/6150e1ca0ede508e56414363cc9062ef522db518/web/web.go#L582-L630 mux := http.NewServeMux() promHandler := promhttp.HandlerFor(o.Gatherer, promhttp.HandlerOpts{Registry: o.Registerer}) mux.Handle("/metrics", promHandler) // This is the path the web package uses, but the router above with no prefix can also be Registered by apiV1 instead. apiPath := "/api" if o.RoutePrefix != "/" { apiPath = o.RoutePrefix + apiPath logger.Info("Router prefix", "prefix", o.RoutePrefix) } av1 := route.New(). WithInstrumentation(setPathWithPrefix(apiPath + "/v1")) apiV1.Register(av1) mux.Handle(apiPath+"/v1/", http.StripPrefix(apiPath+"/v1", av1)) spanNameFormatter := otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string { return fmt.Sprintf("%s %s", r.Method, r.URL.Path) }) r.apiServer, err = r.cfg.APIServer.ServerConfig.ToServer(ctx, host, r.settings.TelemetrySettings, otelhttp.NewHandler(mux, "", spanNameFormatter)) if err != nil { return err } webconfig := "" go func() { if err := toolkit_web.Serve(listener, r.apiServer, &toolkit_web.FlagConfig{WebConfigFile: &webconfig}, logger); err != nil { r.settings.Logger.Error("API server failed", zap.Error(err)) } }() return nil } // Helper function from the Prometheus web package: https://github.com/prometheus/prometheus/blob/6150e1ca0ede508e56414363cc9062ef522db518/web/web.go#L582-L630 func setPathWithPrefix(prefix string) func(handlerName string, handler http.HandlerFunc) http.HandlerFunc { return func(_ string, handler http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { handler(w, r.WithContext(httputil.ContextWithPath(r.Context(), prefix+r.URL.Path))) } } } // gcInterval returns the longest scrape interval used by a scrape config, // plus a delta to prevent race conditions. // This ensures jobs are not garbage collected between scrapes. func gcInterval(cfg *PromConfig) time.Duration { gcInterval := defaultGCInterval if time.Duration(cfg.GlobalConfig.ScrapeInterval)+gcIntervalDelta > gcInterval { gcInterval = time.Duration(cfg.GlobalConfig.ScrapeInterval) + gcIntervalDelta } for _, scrapeConfig := range cfg.ScrapeConfigs { if time.Duration(scrapeConfig.ScrapeInterval)+gcIntervalDelta > gcInterval { gcInterval = time.Duration(scrapeConfig.ScrapeInterval) + gcIntervalDelta } } return gcInterval } // Shutdown stops and cancels the underlying Prometheus scrapers. func (r *pReceiver) Shutdown(ctx context.Context) error { if r.cancelFunc != nil { r.cancelFunc() } if r.scrapeManager != nil { r.scrapeManager.Stop() } if r.targetAllocatorManager != nil { r.targetAllocatorManager.Shutdown() } if r.unregisterMetrics != nil { r.unregisterMetrics() } if r.apiServer != nil { err := r.apiServer.Shutdown(ctx) if err != nil { return err } } return nil }