collector/receiver/prometheusreceiver/metrics_receiver.go (273 lines of code) (raw):
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheusreceiver // import "github.com/GoogleCloudPlatform/run-gmp-sidecar/collector/receiver/prometheusreceiver"
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"os"
"regexp"
"sync"
"time"
"github.com/go-kit/log"
"github.com/mitchellh/hashstructure/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
promHTTP "github.com/prometheus/prometheus/discovery/http"
"github.com/prometheus/prometheus/scrape"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"gopkg.in/yaml.v2"
"github.com/GoogleCloudPlatform/run-gmp-sidecar/collector/receiver/prometheusreceiver/internal"
)
const (
defaultGCInterval = 2 * time.Minute
gcIntervalDelta = 1 * time.Minute
)
// pReceiver is the type that provides Prometheus scraper/receiver functionality.
type pReceiver struct {
cfg *Config
consumer consumer.Metrics
cancelFunc context.CancelFunc
targetAllocatorStop chan struct{}
configLoaded chan struct{}
loadConfigOnce sync.Once
settings receiver.Settings
scrapeManager *scrape.Manager
discoveryManager *discovery.Manager
registerer prometheus.Registerer
}
// New creates a new prometheus.Receiver reference.
func newPrometheusReceiver(set receiver.Settings, cfg *Config, next consumer.Metrics) *pReceiver {
pr := &pReceiver{
cfg: cfg,
consumer: next,
settings: set,
configLoaded: make(chan struct{}),
registerer: prometheus.WrapRegistererWith(
prometheus.Labels{"receiver": set.ID.String()},
prometheus.DefaultRegisterer),
targetAllocatorStop: make(chan struct{}),
}
return pr
}
// Start is the method that starts Prometheus scraping. It
// is controlled by having previously defined a Configuration using perhaps New.
func (r *pReceiver) Start(_ context.Context, host component.Host) error {
discoveryCtx, cancel := context.WithCancel(context.Background())
r.cancelFunc = cancel
logger := internal.NewZapToGokitLogAdapter(r.settings.Logger)
// add scrape configs defined by the collector configs
baseCfg := r.cfg.PrometheusConfig
err := r.initPrometheusComponents(discoveryCtx, logger, host)
if err != nil {
r.settings.Logger.Error("Failed to initPrometheusComponents Prometheus components", zap.Error(err))
return err
}
err = r.applyCfg(baseCfg)
if err != nil {
r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err))
return err
}
allocConf := r.cfg.TargetAllocator
if allocConf != nil {
err = r.startTargetAllocator(allocConf, baseCfg)
if err != nil {
return err
}
}
r.loadConfigOnce.Do(func() {
close(r.configLoaded)
})
return nil
}
func (r *pReceiver) startTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) error {
r.settings.Logger.Info("Starting target allocator discovery")
// immediately sync jobs, not waiting for the first tick
savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg)
if err != nil {
return err
}
go func() {
targetAllocatorIntervalTicker := time.NewTicker(allocConf.Interval)
for {
select {
case <-targetAllocatorIntervalTicker.C:
hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg)
if newErr != nil {
r.settings.Logger.Error(newErr.Error())
continue
}
savedHash = hash
case <-r.targetAllocatorStop:
targetAllocatorIntervalTicker.Stop()
r.settings.Logger.Info("Stopping target allocator")
return
}
}
}()
return nil
}
// syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash.
// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs.
func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAllocator, baseCfg *config.Config) (uint64, error) {
r.settings.Logger.Debug("Syncing target allocator jobs")
scrapeConfigsResponse, err := r.getScrapeConfigsResponse(allocConf.Endpoint)
if err != nil {
r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err))
return 0, err
}
hash, err := hashstructure.Hash(scrapeConfigsResponse, hashstructure.FormatV2, nil)
if err != nil {
r.settings.Logger.Error("Failed to hash job list", zap.Error(err))
return 0, err
}
if hash == compareHash {
// no update needed
return hash, nil
}
// Clear out the current configurations
baseCfg.ScrapeConfigs = []*config.ScrapeConfig{}
for jobName, scrapeConfig := range scrapeConfigsResponse {
var httpSD promHTTP.SDConfig
if allocConf.HTTPSDConfig == nil {
httpSD = promHTTP.SDConfig{
RefreshInterval: model.Duration(30 * time.Second),
}
} else {
httpSD = *allocConf.HTTPSDConfig
}
escapedJob := url.QueryEscape(jobName)
httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", allocConf.Endpoint, escapedJob, allocConf.CollectorID)
httpSD.HTTPClientConfig.FollowRedirects = false
scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{
&httpSD,
}
baseCfg.ScrapeConfigs = append(baseCfg.ScrapeConfigs, scrapeConfig)
}
err = r.applyCfg(baseCfg)
if err != nil {
r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err))
return 0, err
}
return hash, nil
}
// instantiateShard inserts the SHARD environment variable in the returned configuration
func (r *pReceiver) instantiateShard(body []byte) []byte {
shard, ok := os.LookupEnv("SHARD")
if !ok {
shard = "0"
}
return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard))
}
func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) {
scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL)
_, err := url.Parse(scrapeConfigsURL) // check if valid
if err != nil {
return nil, err
}
resp, err := http.Get(scrapeConfigsURL) //nolint
if err != nil {
return nil, err
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
jobToScrapeConfig := map[string]*config.ScrapeConfig{}
envReplacedBody := r.instantiateShard(body)
err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig)
if err != nil {
return nil, err
}
err = resp.Body.Close()
if err != nil {
return nil, err
}
return jobToScrapeConfig, nil
}
func (r *pReceiver) applyCfg(cfg *config.Config) error {
if err := r.scrapeManager.ApplyConfig(cfg); err != nil {
return err
}
discoveryCfg := make(map[string]discovery.Configs)
for _, scrapeConfig := range cfg.ScrapeConfigs {
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName))
}
if err := r.discoveryManager.ApplyConfig(discoveryCfg); err != nil {
return err
}
return nil
}
func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Logger, host component.Host) error {
// Some SD mechanisms use the "refresh" package, which has its own metrics.
refreshSdMetrics := discovery.NewRefreshMetrics(r.registerer)
// Register the metrics specific for each SD mechanism, and the ones for the refresh package.
sdMetrics, err := discovery.RegisterSDMetrics(r.registerer, refreshSdMetrics)
if err != nil {
return fmt.Errorf("failed to register service discovery metrics: %w", err)
}
r.discoveryManager = discovery.NewManager(ctx, logger, r.registerer, sdMetrics, discovery.SkipInitialWait())
if r.discoveryManager == nil {
// NewManager can sometimes return nil if it encountered an error, but
// the error message is logged separately.
return fmt.Errorf("failed to create discovery manager")
}
go func() {
r.settings.Logger.Info("Starting discovery manager")
if err := r.discoveryManager.Run(); err != nil {
r.settings.Logger.Error("Discovery manager failed", zap.Error(err))
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
var startTimeMetricRegex *regexp.Regexp
if r.cfg.AdjusterOpts.StartTimeMetricRegex != "" {
var err error
startTimeMetricRegex, err = regexp.Compile(r.cfg.AdjusterOpts.StartTimeMetricRegex)
if err != nil {
return err
}
}
store, err := internal.NewAppendable(
r.consumer,
r.settings,
gcInterval(r.cfg.PrometheusConfig),
r.cfg.AdjusterOpts.UseStartTimeMetric,
startTimeMetricRegex,
useCreatedMetricGate.IsEnabled(),
r.cfg.AdjusterOpts.UseCollectorStartTimeFallback,
r.cfg.AdjusterOpts.AllowCumulativeResets,
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
r.cfg.TrimMetricSuffixes,
)
if err != nil {
return err
}
// For the sidecar, use a 10s offset from the start before scraping the targets.
tenSecondOffSet := 10 * time.Second
r.scrapeManager, err = scrape.NewManager(&scrape.Options{PassMetadataInContext: true, InitialScrapeOffset: &tenSecondOffSet, DiscoveryReloadOnStartup: true}, logger, store, r.registerer)
if err != nil {
return err
}
go func() {
// The scrape manager needs to wait for the configuration to be loaded before beginning
<-r.configLoaded
r.settings.Logger.Info("Starting scrape manager")
r.scrapeManager.Run(r.discoveryManager.SyncCh())
}()
return nil
}
// gcInterval returns the longest scrape interval used by a scrape config,
// plus a delta to prevent race conditions.
// This ensures jobs are not garbage collected between scrapes.
func gcInterval(cfg *config.Config) time.Duration {
gcInterval := defaultGCInterval
if time.Duration(cfg.GlobalConfig.ScrapeInterval)+gcIntervalDelta > gcInterval {
gcInterval = time.Duration(cfg.GlobalConfig.ScrapeInterval) + gcIntervalDelta
}
for _, scrapeConfig := range cfg.ScrapeConfigs {
if time.Duration(scrapeConfig.ScrapeInterval)+gcIntervalDelta > gcInterval {
gcInterval = time.Duration(scrapeConfig.ScrapeInterval) + gcIntervalDelta
}
}
return gcInterval
}
// Shutdown stops and cancels the underlying Prometheus scrapers.
func (r *pReceiver) Shutdown(context.Context) error {
r.settings.Logger.Info("collector: stopping collector's prometheus receiver")
if r.scrapeManager != nil {
r.scrapeManager.StopAfterScrapeAttempt(time.Now())
}
if r.cancelFunc != nil {
r.cancelFunc()
}
close(r.targetAllocatorStop)
r.settings.Logger.Info("collector: final scrape complete. Shutting down rest of pipeline")
return nil
}