otelcollector/prometheusreceiver/metrics_receiver.go (235 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"net/url"
"os"
"reflect"
"regexp"
"sync"
"time"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
commonconfig "github.com/prometheus/common/config"
"github.com/prometheus/common/version"
promconfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/web"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"go.uber.org/zap/exp/zapslog"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator"
)
const (
defaultGCInterval = 2 * time.Minute
gcIntervalDelta = 1 * time.Minute
// Use same settings as Prometheus web server
maxConnections = 512
readTimeoutMinutes = 10
)
// pReceiver is the type that provides Prometheus scraper/receiver functionality.
type pReceiver struct {
cfg *Config
consumer consumer.Metrics
cancelFunc context.CancelFunc
configLoaded chan struct{}
loadConfigOnce sync.Once
settings receiver.Settings
scrapeManager *scrape.Manager
discoveryManager *discovery.Manager
targetAllocatorManager *targetallocator.Manager
registerer prometheus.Registerer
unregisterMetrics func()
skipOffsetting bool // for testing only
webHandler *web.Handler
}
// New creates a new prometheus.Receiver reference.
func newPrometheusReceiver(set receiver.Settings, cfg *Config, next consumer.Metrics) *pReceiver {
baseCfg := promconfig.Config(*cfg.PrometheusConfig)
pr := &pReceiver{
cfg: cfg,
consumer: next,
settings: set,
configLoaded: make(chan struct{}),
registerer: prometheus.WrapRegistererWith(
prometheus.Labels{"receiver": set.ID.String()},
prometheus.DefaultRegisterer),
targetAllocatorManager: targetallocator.NewManager(
set,
cfg.TargetAllocator,
&baseCfg,
enableNativeHistogramsGate.IsEnabled(),
),
}
return pr
}
// Start is the method that starts Prometheus scraping. It
// is controlled by having previously defined a Configuration using perhaps New.
func (r *pReceiver) Start(ctx context.Context, host component.Host) error {
discoveryCtx, cancel := context.WithCancel(context.Background())
r.cancelFunc = cancel
logger := slog.New(zapslog.NewHandler(r.settings.Logger.Core()))
err := r.initPrometheusComponents(discoveryCtx, logger, host)
if err != nil {
r.settings.Logger.Error("Failed to initPrometheusComponents Prometheus components", zap.Error(err))
return err
}
err = r.targetAllocatorManager.Start(ctx, host, r.scrapeManager, r.discoveryManager, r.webHandler)
if err != nil {
return err
}
r.loadConfigOnce.Do(func() {
close(r.configLoaded)
})
return nil
}
func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger *slog.Logger, host component.Host) error {
// Some SD mechanisms use the "refresh" package, which has its own metrics.
refreshSdMetrics := discovery.NewRefreshMetrics(r.registerer)
// Register the metrics specific for each SD mechanism, and the ones for the refresh package.
sdMetrics, err := discovery.RegisterSDMetrics(r.registerer, refreshSdMetrics)
if err != nil {
return fmt.Errorf("failed to register service discovery metrics: %w", err)
}
r.discoveryManager = discovery.NewManager(ctx, logger, r.registerer, sdMetrics)
if r.discoveryManager == nil {
// NewManager can sometimes return nil if it encountered an error, but
// the error message is logged separately.
return errors.New("failed to create discovery manager")
}
go func() {
r.settings.Logger.Info("Starting discovery manager")
if err = r.discoveryManager.Run(); err != nil && !errors.Is(err, context.Canceled) {
r.settings.Logger.Error("Discovery manager failed", zap.Error(err))
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
var startTimeMetricRegex *regexp.Regexp
if r.cfg.StartTimeMetricRegex != "" {
startTimeMetricRegex, err = regexp.Compile(r.cfg.StartTimeMetricRegex)
if err != nil {
return err
}
}
store, err := internal.NewAppendable(
r.consumer,
r.settings,
gcInterval(r.cfg.PrometheusConfig),
r.cfg.UseStartTimeMetric,
startTimeMetricRegex,
useCreatedMetricGate.IsEnabled(),
enableNativeHistogramsGate.IsEnabled(),
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
r.cfg.TrimMetricSuffixes,
)
if err != nil {
return err
}
opts := &scrape.Options{
PassMetadataInContext: true,
ExtraMetrics: r.cfg.ReportExtraScrapeMetrics,
HTTPClientOptions: []commonconfig.HTTPClientOption{
commonconfig.WithUserAgent(r.settings.BuildInfo.Command + "/" + r.settings.BuildInfo.Version),
},
EnableCreatedTimestampZeroIngestion: true,
}
if enableNativeHistogramsGate.IsEnabled() {
opts.EnableNativeHistogramsIngestion = true
}
// for testing only
if r.skipOffsetting {
optsValue := reflect.ValueOf(opts).Elem()
field := optsValue.FieldByName("skipOffsetting")
reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).
Elem().
Set(reflect.ValueOf(true))
}
scrapeManager, err := scrape.NewManager(opts, logger, nil, store, r.registerer)
if err != nil {
return err
}
r.scrapeManager = scrapeManager
r.unregisterMetrics = func() {
refreshSdMetrics.Unregister()
for _, sdMetric := range sdMetrics {
sdMetric.Unregister()
}
r.discoveryManager.UnregisterMetrics()
r.scrapeManager.UnregisterMetrics()
}
go func() {
// The scrape manager needs to wait for the configuration to be loaded before beginning
<-r.configLoaded
r.settings.Logger.Info("Starting scrape manager")
if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil {
r.settings.Logger.Error("Scrape manager failed", zap.Error(err))
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
// Setup settings and logger and create Prometheus web handler
webOptions := web.Options{
ScrapeManager: r.scrapeManager,
Context: ctx,
ListenAddresses: []string{"localhost:9090"},
ExternalURL: &url.URL{
Scheme: "http",
Host: "localhost:9090",
Path: "",
},
RoutePrefix: "/",
ReadTimeout: time.Minute * readTimeoutMinutes,
PageTitle: "Prometheus Receiver",
Version: &web.PrometheusVersion{
Version: version.Version,
Revision: version.Revision,
Branch: version.Branch,
BuildUser: version.BuildUser,
BuildDate: version.BuildDate,
GoVersion: version.GoVersion,
},
Flags: make(map[string]string),
MaxConnections: maxConnections,
IsAgent: true,
Gatherer: prometheus.DefaultGatherer,
UseOldUI: true,
}
go_kit_logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
r.webHandler = web.New(go_kit_logger, &webOptions)
sem := make(chan struct{}, maxConnections)
listener, err := r.webHandler.Listener("localhost:9090", sem)
if err != nil {
return err
}
// Pass config and let the web handler know the config is ready.
// These are needed because Prometheus allows reloading the config without restarting.
r.webHandler.ApplyConfig((*promconfig.Config)(r.cfg.PrometheusConfig))
r.webHandler.SetReady(web.Ready)
// Uses the same context as the discovery and scrape managers for shutting down
go func() {
if err := r.webHandler.Run(ctx, []net.Listener{listener}, ""); err != nil {
r.settings.Logger.Error("Web handler failed", zap.Error(err))
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
return nil
}
// gcInterval returns the longest scrape interval used by a scrape config,
// plus a delta to prevent race conditions.
// This ensures jobs are not garbage collected between scrapes.
func gcInterval(cfg *PromConfig) time.Duration {
gcInterval := defaultGCInterval
if time.Duration(cfg.GlobalConfig.ScrapeInterval)+gcIntervalDelta > gcInterval {
gcInterval = time.Duration(cfg.GlobalConfig.ScrapeInterval) + gcIntervalDelta
}
for _, scrapeConfig := range cfg.ScrapeConfigs {
if time.Duration(scrapeConfig.ScrapeInterval)+gcIntervalDelta > gcInterval {
gcInterval = time.Duration(scrapeConfig.ScrapeInterval) + gcIntervalDelta
}
}
return gcInterval
}
// Shutdown stops and cancels the underlying Prometheus scrapers.
func (r *pReceiver) Shutdown(context.Context) error {
if r.cancelFunc != nil {
r.cancelFunc()
}
if r.scrapeManager != nil {
r.scrapeManager.Stop()
}
if r.targetAllocatorManager != nil {
r.targetAllocatorManager.Shutdown()
}
if r.unregisterMetrics != nil {
r.unregisterMetrics()
}
return nil
}