plugins/inputs/prometheus/start.go (302 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT // The following code is based on Prometheus project https://github.com/prometheus/prometheus/blob/master/cmd/prometheus/main.go // and we did modification to remove the logic related to flag handling, Rule manager, TSDB, Web handler, and Notifier. // Copyright 2015 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package prometheus import ( "context" "fmt" "os" "os/signal" "runtime" "sync" "syscall" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/run" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" v "github.com/prometheus/client_golang/prometheus/collectors/version" "github.com/prometheus/common/model" "github.com/prometheus/common/promlog" "github.com/prometheus/common/version" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" _ "github.com/prometheus/prometheus/discovery/install" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" promRuntime "github.com/prometheus/prometheus/util/runtime" "k8s.io/klog/v2" "k8s.io/klog/v2/klogr" ) var ( configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "prometheus_config_last_reload_successful", Help: "Whether the last configuration reload attempt was successful.", }) configSuccessTime = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "prometheus_config_last_reload_success_timestamp_seconds", Help: "Timestamp of the last successful configuration reload.", }) ) var ( // Save name before re-label since customers can relabel the prometheus metric name // https://github.com/aws/amazon-cloudwatch-agent/issues/190 // and we would not able to get the metric type for the metric // and result in dropping the metrics if it is unknown // https://github.com/aws/amazon-cloudwatch-agent/blob/main/plugins/inputs/prometheus_scraper/metrics_filter.go#L23 metricNameRelabelConfigs = []*relabel.Config{ { Action: relabel.Replace, Regex: relabel.MustNewRegexp("(.*)"), Replacement: "$1", TargetLabel: savedScrapeNameLabel, SourceLabels: model.LabelNames{"__name__"}, }, } ) func init() { prometheus.MustRegister(v.NewCollector("prometheus")) } func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan interface{}, wg *sync.WaitGroup, mth *metricsTypeHandler) { logLevel := &promlog.AllowedLevel{} logLevel.Set("info") if os.Getenv("DEBUG") != "" { runtime.SetBlockProfileRate(20) runtime.SetMutexProfileFraction(20) logLevel.Set("debug") } logFormat := &promlog.AllowedFormat{} _ = logFormat.Set("logfmt") cfg := struct { configFile string promlogConfig promlog.Config }{ promlogConfig: promlog.Config{Level: logLevel, Format: logFormat}, } cfg.configFile = configFilePath logger := promlog.New(&cfg.promlogConfig) klog.SetLogger(klogr.New().WithName("k8s_client_runtime").V(6)) level.Info(logger).Log("msg", "Starting Prometheus", "version", version.Info()) level.Info(logger).Log("build_context", version.BuildContext()) level.Info(logger).Log("host_details", promRuntime.Uname()) level.Info(logger).Log("fd_limits", promRuntime.FdLimits()) level.Info(logger).Log("vm_limits", promRuntime.VMLimits()) var ( ctxScrape, cancelScrape = context.WithCancel(context.Background()) sdMetrics, _ = discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer) discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape")) scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer) taManager = createTargetAllocatorManager(configFilePath, log.With(logger, "component", "ta manager"), logLevel, scrapeManager, discoveryManagerScrape) ) level.Info(logger).Log("msg", fmt.Sprintf("Target Allocator is %t", taManager.enabled)) //Setup Target Allocator Scrape Post Process Handler taManager.AttachReloadConfigHandler( func(prometheusConfig *config.Config) { relabelScrapeConfigs(prometheusConfig, logger) }, ) mth.SetScrapeManager(scrapeManager) var reloaders = []func(cfg *config.Config) error{ // The Scrape and notifier managers need to reload before the Discovery manager as // they need to read the most updated config when receiving the new targets list. scrapeManager.ApplyConfig, func(cfg *config.Config) error { c := make(map[string]discovery.Configs) for _, v := range cfg.ScrapeConfigs { c[v.JobName] = v.ServiceDiscoveryConfigs } return discoveryManagerScrape.ApplyConfig(c) }, } prometheus.MustRegister(configSuccess) prometheus.MustRegister(configSuccessTime) // sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded). type closeOnce struct { C chan struct{} once sync.Once Close func() } // Wait until the server is ready to handle reloading. reloadReady := &closeOnce{ C: make(chan struct{}), } reloadReady.Close = func() { reloadReady.once.Do(func() { close(reloadReady.C) }) } var g run.Group { // Termination handler. cancel := make(chan struct{}) g.Add( func() error { // Don't forget to release the reloadReady channel so that waiting blocks can exit normally. select { case <-shutDownChan: level.Warn(logger).Log("msg", "Received ShutDown, exiting gracefully...") reloadReady.Close() case <-cancel: reloadReady.Close() break } return nil }, func(err error) { close(cancel) }, ) } { // Scrape discovery manager. g.Add( func() error { level.Info(logger).Log("msg", "Scrape discovery manager starting") err := discoveryManagerScrape.Run() level.Info(logger).Log("msg", "Scrape discovery manager stopped", "error", err) return err }, func(err error) { level.Info(logger).Log("msg", "Stopping scrape discovery manager...", "error", err) cancelScrape() }, ) } { // Scrape manager. g.Add( func() error { // When the scrape manager receives a new targets list // it needs to read a valid config for each job. // It depends on the config being in sync with the discovery manager so // we wait until the config is fully loaded. <-reloadReady.C level.Info(logger).Log("msg", "start discovery") err := scrapeManager.Run(discoveryManagerScrape.SyncCh()) level.Info(logger).Log("msg", "Scrape manager stopped", "error", err) return err }, func(err error) { // Scrape manager needs to be stopped before closing the local TSDB // so that it doesn't try to write samples to a closed storage. level.Info(logger).Log("msg", "Stopping scrape manager...", "error", err) scrapeManager.Stop() }, ) } { // Target Allocator manager. if taManager.enabled { g.Add( func() error { // we wait until the config is fully loaded. level.Info(logger).Log("msg", "start ta manager") err := taManager.Run() level.Info(logger).Log("msg", "ta manager stopped", "error", err) return err }, func(err error) { level.Info(logger).Log("msg", "Stopping ta manager...", "error", err) taManager.Shutdown() }, ) } } { // Reload handler. // Make sure that sighup handler is registered with a redirect to the channel before the potentially // long and synchronous tsdb init. hup := make(chan os.Signal, 1) signal.Notify(hup, syscall.SIGHUP) cancel := make(chan struct{}) g.Add( func() error { <-reloadReady.C for { select { case <-hup: if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) } case <-cancel: return nil } } }, func(err error) { // Wait for any in-progress reloads to complete to avoid // reloading things after they have been shutdown. cancel <- struct{}{} }, ) } { // Initial configuration loading. cancel := make(chan struct{}) g.Add( func() error { select { // In case a shutdown is initiated before the dbOpen is released case <-cancel: reloadReady.Close() return nil default: } if taManager.enabled { <-taManager.taReadyCh } level.Info(logger).Log("msg", "handling config file") if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil { return errors.Wrapf(err, "error loading config from %q", cfg.configFile) } level.Info(logger).Log("msg", "finish handling config file") reloadReady.Close() <-cancel return nil }, func(err error) { close(cancel) }, ) } if err := g.Run(); err != nil { level.Error(logger).Log("err", err) } level.Info(logger).Log("msg", "See you next time!") wg.Done() } const ( savedScrapeJobLabel = "cwagent_saved_scrape_job" savedScrapeInstanceLabel = "cwagent_saved_scrape_instance" scrapeInstanceLabel = "__address__" savedScrapeNameLabel = "cwagent_saved_scrape_name" // just arbitrary name that end user won't override in relabel config ) func relabelScrapeConfigs(prometheusConfig *config.Config, logger log.Logger) { // For saving name before relabel // - __name__ https://github.com/aws/amazon-cloudwatch-agent/issues/190 // - job and instance https://github.com/aws/amazon-cloudwatch-agent/issues/193 for _, sc := range prometheusConfig.ScrapeConfigs { relabelConfigs := []*relabel.Config{ // job { Action: relabel.Replace, Regex: relabel.MustNewRegexp(".*"), // __address__ is always there, so we will find a match for every job Replacement: sc.JobName, // value is hard coded job name SourceLabels: model.LabelNames{"__address__"}, TargetLabel: savedScrapeJobLabel, // creates a new magic label }, // instance { Action: relabel.Replace, Regex: relabel.MustNewRegexp("(.*)"), Replacement: "$1", // value is actual __address__, i.e. instance if you don't relabel it. SourceLabels: model.LabelNames{"__address__"}, TargetLabel: savedScrapeInstanceLabel, // creates a new magic label }, } level.Debug(logger).Log("msg", "Add extra relabel_configs and metric_relabel_configs to save job, instance and __name__ before user relabel") sc.RelabelConfigs = append(relabelConfigs, sc.RelabelConfigs...) sc.MetricRelabelConfigs = append(metricNameRelabelConfigs, sc.MetricRelabelConfigs...) } } func reloadConfig(filename string, logger log.Logger, taManager *TargetAllocatorManager, rls ...func(*config.Config) error) (err error) { level.Info(logger).Log("msg", "Loading configuration file", "filename", filename) content, _ := os.ReadFile(filename) text := string(content) level.Debug(logger).Log("msg", "Prometheus configuration file", "value", text) defer func() { if err == nil { configSuccess.Set(1) configSuccessTime.SetToCurrentTime() } else { configSuccess.Set(0) } }() // Check for TA var conf *config.Config if taManager.enabled { level.Info(logger).Log("msg", "Target Allocator is enabled") conf = (*config.Config)(taManager.config.PrometheusConfig) } else { conf, err = config.LoadFile(filename, false, false, logger) if err != nil { return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename) } } relabelScrapeConfigs(conf, logger) failed := false for _, rl := range rls { if err := rl(conf); err != nil { level.Error(logger).Log("msg", "Failed to apply configuration", "err", err) failed = true } } if failed { return errors.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename) } level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename) return nil }