otelcollector/prometheusreceiver/targetallocator/manager.go (218 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package targetallocator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" import ( "bytes" "context" "fmt" "hash/fnv" "io" "net/http" "net/url" "os" "sort" "time" commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/web" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" "gopkg.in/yaml.v2" ) type Manager struct { settings receiver.Settings shutdown chan struct{} cfg *Config promCfg *promconfig.Config initialScrapeConfigs []*promconfig.ScrapeConfig scrapeManager *scrape.Manager discoveryManager *discovery.Manager webHandler *web.Handler enableNativeHistograms bool } func NewManager(set receiver.Settings, cfg *Config, promCfg *promconfig.Config, enableNativeHistograms bool) *Manager { return &Manager{ shutdown: make(chan struct{}), settings: set, cfg: cfg, promCfg: promCfg, initialScrapeConfigs: promCfg.ScrapeConfigs, enableNativeHistograms: enableNativeHistograms, } } func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Manager, dm *discovery.Manager, wh *web.Handler) error { m.scrapeManager = sm m.discoveryManager = dm m.webHandler = wh err := m.applyCfg() if err != nil { m.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) return err } if m.cfg == nil { // the target allocator is disabled return nil } httpClient, err := m.cfg.ClientConfig.ToClient(ctx, host, m.settings.TelemetrySettings) if err != nil { m.settings.Logger.Error("Failed to create http client", zap.Error(err)) return err } m.settings.Logger.Info("Starting target allocator discovery") // immediately sync jobs, not waiting for the first tick savedHash, err := m.sync(uint64(0), httpClient) if err != nil { return err } go func() { targetAllocatorIntervalTicker := time.NewTicker(m.cfg.Interval) for { select { case <-targetAllocatorIntervalTicker.C: hash, newErr := m.sync(savedHash, httpClient) if newErr != nil { m.settings.Logger.Error(newErr.Error()) continue } savedHash = hash case <-m.shutdown: targetAllocatorIntervalTicker.Stop() m.settings.Logger.Info("Stopping target allocator") return } } }() return nil } func (m *Manager) Shutdown() { close(m.shutdown) } // sync request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash. // baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs. func (m *Manager) sync(compareHash uint64, httpClient *http.Client) (uint64, error) { m.settings.Logger.Debug("Syncing target allocator jobs") scrapeConfigsResponse, err := getScrapeConfigsResponse(httpClient, m.cfg.Endpoint) if err != nil { m.settings.Logger.Error("Failed to retrieve job list", zap.Error(err)) return 0, err } hash, err := getScrapeConfigHash(scrapeConfigsResponse) if err != nil { m.settings.Logger.Error("Failed to hash job list", zap.Error(err)) return 0, err } if hash == compareHash { // no update needed return hash, nil } // Copy initial scrape configurations initialConfig := make([]*promconfig.ScrapeConfig, len(m.initialScrapeConfigs)) copy(initialConfig, m.initialScrapeConfigs) m.promCfg.ScrapeConfigs = initialConfig for jobName, scrapeConfig := range scrapeConfigsResponse { var httpSD promHTTP.SDConfig if m.cfg.HTTPSDConfig == nil { httpSD = promHTTP.SDConfig{ RefreshInterval: model.Duration(30 * time.Second), } } else { httpSD = promHTTP.SDConfig(*m.cfg.HTTPSDConfig) } escapedJob := url.QueryEscape(jobName) httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", m.cfg.Endpoint, escapedJob, m.cfg.CollectorID) err = configureSDHTTPClientConfigFromTA(&httpSD, m.cfg) if err != nil { m.settings.Logger.Error("Failed to configure http client config", zap.Error(err)) return 0, err } httpSD.HTTPClientConfig.FollowRedirects = false scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{ &httpSD, } if m.cfg.HTTPScrapeConfig != nil { scrapeConfig.HTTPClientConfig = commonconfig.HTTPClientConfig(*m.cfg.HTTPScrapeConfig) } m.promCfg.ScrapeConfigs = append(m.promCfg.ScrapeConfigs, scrapeConfig) } err = m.applyCfg() if err != nil { m.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) return 0, err } return hash, nil } func (m *Manager) applyCfg() error { scrapeConfigs, err := m.promCfg.GetScrapeConfigs() if err != nil { return fmt.Errorf("could not get scrape configs: %w", err) } if !m.enableNativeHistograms { // Enforce scraping classic histograms to avoid dropping them. for _, scrapeConfig := range m.promCfg.ScrapeConfigs { scrapeConfig.AlwaysScrapeClassicHistograms = true } } if err := m.scrapeManager.ApplyConfig(m.promCfg); err != nil { return err } if err := m.webHandler.ApplyConfig(m.promCfg); err != nil { return err } discoveryCfg := make(map[string]discovery.Configs) for _, scrapeConfig := range scrapeConfigs { discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs m.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) } return m.discoveryManager.ApplyConfig(discoveryCfg) } func getScrapeConfigsResponse(httpClient *http.Client, baseURL string) (map[string]*promconfig.ScrapeConfig, error) { scrapeConfigsURL := baseURL + "/scrape_configs" _, err := url.Parse(scrapeConfigsURL) // check if valid if err != nil { return nil, err } resp, err := httpClient.Get(scrapeConfigsURL) if err != nil { return nil, err } body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } jobToScrapeConfig := map[string]*promconfig.ScrapeConfig{} envReplacedBody := instantiateShard(body) err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig) if err != nil { return nil, err } err = resp.Body.Close() if err != nil { return nil, err } return jobToScrapeConfig, nil } // instantiateShard inserts the SHARD environment variable in the returned configuration func instantiateShard(body []byte) []byte { shard, ok := os.LookupEnv("SHARD") if !ok { shard = "0" } return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard)) } // Calculate a hash for a scrape config map. // This is done by marshaling to YAML because it's the most straightforward and doesn't run into problems with unexported fields. func getScrapeConfigHash(jobToScrapeConfig map[string]*promconfig.ScrapeConfig) (uint64, error) { var err error hash := fnv.New64() yamlEncoder := yaml.NewEncoder(hash) jobKeys := make([]string, 0, len(jobToScrapeConfig)) for jobName := range jobToScrapeConfig { jobKeys = append(jobKeys, jobName) } sort.Strings(jobKeys) for _, jobName := range jobKeys { _, err = hash.Write([]byte(jobName)) if err != nil { return 0, err } err = yamlEncoder.Encode(jobToScrapeConfig[jobName]) if err != nil { return 0, err } } yamlEncoder.Close() return hash.Sum64(), err }