in otelcollector/prometheusreceiver/metrics_receiver.go [111:251]
func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger *slog.Logger, host component.Host) error {
// Some SD mechanisms use the "refresh" package, which has its own metrics.
refreshSdMetrics := discovery.NewRefreshMetrics(r.registerer)
// Register the metrics specific for each SD mechanism, and the ones for the refresh package.
sdMetrics, err := discovery.RegisterSDMetrics(r.registerer, refreshSdMetrics)
if err != nil {
return fmt.Errorf("failed to register service discovery metrics: %w", err)
}
r.discoveryManager = discovery.NewManager(ctx, logger, r.registerer, sdMetrics)
if r.discoveryManager == nil {
// NewManager can sometimes return nil if it encountered an error, but
// the error message is logged separately.
return errors.New("failed to create discovery manager")
}
go func() {
r.settings.Logger.Info("Starting discovery manager")
if err = r.discoveryManager.Run(); err != nil && !errors.Is(err, context.Canceled) {
r.settings.Logger.Error("Discovery manager failed", zap.Error(err))
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
var startTimeMetricRegex *regexp.Regexp
if r.cfg.StartTimeMetricRegex != "" {
startTimeMetricRegex, err = regexp.Compile(r.cfg.StartTimeMetricRegex)
if err != nil {
return err
}
}
store, err := internal.NewAppendable(
r.consumer,
r.settings,
gcInterval(r.cfg.PrometheusConfig),
r.cfg.UseStartTimeMetric,
startTimeMetricRegex,
useCreatedMetricGate.IsEnabled(),
enableNativeHistogramsGate.IsEnabled(),
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
r.cfg.TrimMetricSuffixes,
)
if err != nil {
return err
}
opts := &scrape.Options{
PassMetadataInContext: true,
ExtraMetrics: r.cfg.ReportExtraScrapeMetrics,
HTTPClientOptions: []commonconfig.HTTPClientOption{
commonconfig.WithUserAgent(r.settings.BuildInfo.Command + "/" + r.settings.BuildInfo.Version),
},
EnableCreatedTimestampZeroIngestion: true,
}
if enableNativeHistogramsGate.IsEnabled() {
opts.EnableNativeHistogramsIngestion = true
}
// for testing only
if r.skipOffsetting {
optsValue := reflect.ValueOf(opts).Elem()
field := optsValue.FieldByName("skipOffsetting")
reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).
Elem().
Set(reflect.ValueOf(true))
}
scrapeManager, err := scrape.NewManager(opts, logger, nil, store, r.registerer)
if err != nil {
return err
}
r.scrapeManager = scrapeManager
r.unregisterMetrics = func() {
refreshSdMetrics.Unregister()
for _, sdMetric := range sdMetrics {
sdMetric.Unregister()
}
r.discoveryManager.UnregisterMetrics()
r.scrapeManager.UnregisterMetrics()
}
go func() {
// The scrape manager needs to wait for the configuration to be loaded before beginning
<-r.configLoaded
r.settings.Logger.Info("Starting scrape manager")
if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil {
r.settings.Logger.Error("Scrape manager failed", zap.Error(err))
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
// Setup settings and logger and create Prometheus web handler
webOptions := web.Options{
ScrapeManager: r.scrapeManager,
Context: ctx,
ListenAddresses: []string{"localhost:9090"},
ExternalURL: &url.URL{
Scheme: "http",
Host: "localhost:9090",
Path: "",
},
RoutePrefix: "/",
ReadTimeout: time.Minute * readTimeoutMinutes,
PageTitle: "Prometheus Receiver",
Version: &web.PrometheusVersion{
Version: version.Version,
Revision: version.Revision,
Branch: version.Branch,
BuildUser: version.BuildUser,
BuildDate: version.BuildDate,
GoVersion: version.GoVersion,
},
Flags: make(map[string]string),
MaxConnections: maxConnections,
IsAgent: true,
Gatherer: prometheus.DefaultGatherer,
UseOldUI: true,
}
go_kit_logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
r.webHandler = web.New(go_kit_logger, &webOptions)
sem := make(chan struct{}, maxConnections)
listener, err := r.webHandler.Listener("localhost:9090", sem)
if err != nil {
return err
}
// Pass config and let the web handler know the config is ready.
// These are needed because Prometheus allows reloading the config without restarting.
r.webHandler.ApplyConfig((*promconfig.Config)(r.cfg.PrometheusConfig))
r.webHandler.SetReady(web.Ready)
// Uses the same context as the discovery and scrape managers for shutting down
go func() {
if err := r.webHandler.Run(ctx, []net.Listener{listener}, ""); err != nil {
r.settings.Logger.Error("Web handler failed", zap.Error(err))
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
return nil
}