collector/receiver/prometheusreceiver/metrics_receiver.go (273 lines of code) (raw):

// Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package prometheusreceiver // import "github.com/GoogleCloudPlatform/run-gmp-sidecar/collector/receiver/prometheusreceiver" import ( "bytes" "context" "fmt" "io" "net/http" "net/url" "os" "regexp" "sync" "time" "github.com/go-kit/log" "github.com/mitchellh/hashstructure/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/prometheus/prometheus/scrape" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" "gopkg.in/yaml.v2" "github.com/GoogleCloudPlatform/run-gmp-sidecar/collector/receiver/prometheusreceiver/internal" ) const ( defaultGCInterval = 2 * time.Minute gcIntervalDelta = 1 * time.Minute ) // pReceiver is the type that provides Prometheus scraper/receiver functionality. type pReceiver struct { cfg *Config consumer consumer.Metrics cancelFunc context.CancelFunc targetAllocatorStop chan struct{} configLoaded chan struct{} loadConfigOnce sync.Once settings receiver.Settings scrapeManager *scrape.Manager discoveryManager *discovery.Manager registerer prometheus.Registerer } // New creates a new prometheus.Receiver reference. func newPrometheusReceiver(set receiver.Settings, cfg *Config, next consumer.Metrics) *pReceiver { pr := &pReceiver{ cfg: cfg, consumer: next, settings: set, configLoaded: make(chan struct{}), registerer: prometheus.WrapRegistererWith( prometheus.Labels{"receiver": set.ID.String()}, prometheus.DefaultRegisterer), targetAllocatorStop: make(chan struct{}), } return pr } // Start is the method that starts Prometheus scraping. It // is controlled by having previously defined a Configuration using perhaps New. func (r *pReceiver) Start(_ context.Context, host component.Host) error { discoveryCtx, cancel := context.WithCancel(context.Background()) r.cancelFunc = cancel logger := internal.NewZapToGokitLogAdapter(r.settings.Logger) // add scrape configs defined by the collector configs baseCfg := r.cfg.PrometheusConfig err := r.initPrometheusComponents(discoveryCtx, logger, host) if err != nil { r.settings.Logger.Error("Failed to initPrometheusComponents Prometheus components", zap.Error(err)) return err } err = r.applyCfg(baseCfg) if err != nil { r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) return err } allocConf := r.cfg.TargetAllocator if allocConf != nil { err = r.startTargetAllocator(allocConf, baseCfg) if err != nil { return err } } r.loadConfigOnce.Do(func() { close(r.configLoaded) }) return nil } func (r *pReceiver) startTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) error { r.settings.Logger.Info("Starting target allocator discovery") // immediately sync jobs, not waiting for the first tick savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) if err != nil { return err } go func() { targetAllocatorIntervalTicker := time.NewTicker(allocConf.Interval) for { select { case <-targetAllocatorIntervalTicker.C: hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg) if newErr != nil { r.settings.Logger.Error(newErr.Error()) continue } savedHash = hash case <-r.targetAllocatorStop: targetAllocatorIntervalTicker.Stop() r.settings.Logger.Info("Stopping target allocator") return } } }() return nil } // syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash. // baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs. func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAllocator, baseCfg *config.Config) (uint64, error) { r.settings.Logger.Debug("Syncing target allocator jobs") scrapeConfigsResponse, err := r.getScrapeConfigsResponse(allocConf.Endpoint) if err != nil { r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err)) return 0, err } hash, err := hashstructure.Hash(scrapeConfigsResponse, hashstructure.FormatV2, nil) if err != nil { r.settings.Logger.Error("Failed to hash job list", zap.Error(err)) return 0, err } if hash == compareHash { // no update needed return hash, nil } // Clear out the current configurations baseCfg.ScrapeConfigs = []*config.ScrapeConfig{} for jobName, scrapeConfig := range scrapeConfigsResponse { var httpSD promHTTP.SDConfig if allocConf.HTTPSDConfig == nil { httpSD = promHTTP.SDConfig{ RefreshInterval: model.Duration(30 * time.Second), } } else { httpSD = *allocConf.HTTPSDConfig } escapedJob := url.QueryEscape(jobName) httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", allocConf.Endpoint, escapedJob, allocConf.CollectorID) httpSD.HTTPClientConfig.FollowRedirects = false scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{ &httpSD, } baseCfg.ScrapeConfigs = append(baseCfg.ScrapeConfigs, scrapeConfig) } err = r.applyCfg(baseCfg) if err != nil { r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) return 0, err } return hash, nil } // instantiateShard inserts the SHARD environment variable in the returned configuration func (r *pReceiver) instantiateShard(body []byte) []byte { shard, ok := os.LookupEnv("SHARD") if !ok { shard = "0" } return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard)) } func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) { scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL) _, err := url.Parse(scrapeConfigsURL) // check if valid if err != nil { return nil, err } resp, err := http.Get(scrapeConfigsURL) //nolint if err != nil { return nil, err } body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } jobToScrapeConfig := map[string]*config.ScrapeConfig{} envReplacedBody := r.instantiateShard(body) err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig) if err != nil { return nil, err } err = resp.Body.Close() if err != nil { return nil, err } return jobToScrapeConfig, nil } func (r *pReceiver) applyCfg(cfg *config.Config) error { if err := r.scrapeManager.ApplyConfig(cfg); err != nil { return err } discoveryCfg := make(map[string]discovery.Configs) for _, scrapeConfig := range cfg.ScrapeConfigs { discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) } if err := r.discoveryManager.ApplyConfig(discoveryCfg); err != nil { return err } return nil } func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Logger, host component.Host) error { // Some SD mechanisms use the "refresh" package, which has its own metrics. refreshSdMetrics := discovery.NewRefreshMetrics(r.registerer) // Register the metrics specific for each SD mechanism, and the ones for the refresh package. sdMetrics, err := discovery.RegisterSDMetrics(r.registerer, refreshSdMetrics) if err != nil { return fmt.Errorf("failed to register service discovery metrics: %w", err) } r.discoveryManager = discovery.NewManager(ctx, logger, r.registerer, sdMetrics, discovery.SkipInitialWait()) if r.discoveryManager == nil { // NewManager can sometimes return nil if it encountered an error, but // the error message is logged separately. return fmt.Errorf("failed to create discovery manager") } go func() { r.settings.Logger.Info("Starting discovery manager") if err := r.discoveryManager.Run(); err != nil { r.settings.Logger.Error("Discovery manager failed", zap.Error(err)) componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } }() var startTimeMetricRegex *regexp.Regexp if r.cfg.AdjusterOpts.StartTimeMetricRegex != "" { var err error startTimeMetricRegex, err = regexp.Compile(r.cfg.AdjusterOpts.StartTimeMetricRegex) if err != nil { return err } } store, err := internal.NewAppendable( r.consumer, r.settings, gcInterval(r.cfg.PrometheusConfig), r.cfg.AdjusterOpts.UseStartTimeMetric, startTimeMetricRegex, useCreatedMetricGate.IsEnabled(), r.cfg.AdjusterOpts.UseCollectorStartTimeFallback, r.cfg.AdjusterOpts.AllowCumulativeResets, r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels, r.cfg.TrimMetricSuffixes, ) if err != nil { return err } // For the sidecar, use a 10s offset from the start before scraping the targets. tenSecondOffSet := 10 * time.Second r.scrapeManager, err = scrape.NewManager(&scrape.Options{PassMetadataInContext: true, InitialScrapeOffset: &tenSecondOffSet, DiscoveryReloadOnStartup: true}, logger, store, r.registerer) if err != nil { return err } go func() { // The scrape manager needs to wait for the configuration to be loaded before beginning <-r.configLoaded r.settings.Logger.Info("Starting scrape manager") r.scrapeManager.Run(r.discoveryManager.SyncCh()) }() return nil } // gcInterval returns the longest scrape interval used by a scrape config, // plus a delta to prevent race conditions. // This ensures jobs are not garbage collected between scrapes. func gcInterval(cfg *config.Config) time.Duration { gcInterval := defaultGCInterval if time.Duration(cfg.GlobalConfig.ScrapeInterval)+gcIntervalDelta > gcInterval { gcInterval = time.Duration(cfg.GlobalConfig.ScrapeInterval) + gcIntervalDelta } for _, scrapeConfig := range cfg.ScrapeConfigs { if time.Duration(scrapeConfig.ScrapeInterval)+gcIntervalDelta > gcInterval { gcInterval = time.Duration(scrapeConfig.ScrapeInterval) + gcIntervalDelta } } return gcInterval } // Shutdown stops and cancels the underlying Prometheus scrapers. func (r *pReceiver) Shutdown(context.Context) error { r.settings.Logger.Info("collector: stopping collector's prometheus receiver") if r.scrapeManager != nil { r.scrapeManager.StopAfterScrapeAttempt(time.Now()) } if r.cancelFunc != nil { r.cancelFunc() } close(r.targetAllocatorStop) r.settings.Logger.Info("collector: final scrape complete. Shutting down rest of pipeline") return nil }