in plugins/inputs/prometheus/start.go [83:312]
func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan interface{}, wg *sync.WaitGroup, mth *metricsTypeHandler) {
logLevel := &promlog.AllowedLevel{}
logLevel.Set("info")
if os.Getenv("DEBUG") != "" {
runtime.SetBlockProfileRate(20)
runtime.SetMutexProfileFraction(20)
logLevel.Set("debug")
}
logFormat := &promlog.AllowedFormat{}
_ = logFormat.Set("logfmt")
cfg := struct {
configFile string
promlogConfig promlog.Config
}{
promlogConfig: promlog.Config{Level: logLevel, Format: logFormat},
}
cfg.configFile = configFilePath
logger := promlog.New(&cfg.promlogConfig)
klog.SetLogger(klogr.New().WithName("k8s_client_runtime").V(6))
level.Info(logger).Log("msg", "Starting Prometheus", "version", version.Info())
level.Info(logger).Log("build_context", version.BuildContext())
level.Info(logger).Log("host_details", promRuntime.Uname())
level.Info(logger).Log("fd_limits", promRuntime.FdLimits())
level.Info(logger).Log("vm_limits", promRuntime.VMLimits())
var (
ctxScrape, cancelScrape = context.WithCancel(context.Background())
sdMetrics, _ = discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer)
taManager = createTargetAllocatorManager(configFilePath, log.With(logger, "component", "ta manager"), logLevel, scrapeManager, discoveryManagerScrape)
)
level.Info(logger).Log("msg", fmt.Sprintf("Target Allocator is %t", taManager.enabled))
//Setup Target Allocator Scrape Post Process Handler
taManager.AttachReloadConfigHandler(
func(prometheusConfig *config.Config) {
relabelScrapeConfigs(prometheusConfig, logger)
},
)
mth.SetScrapeManager(scrapeManager)
var reloaders = []func(cfg *config.Config) error{
// The Scrape and notifier managers need to reload before the Discovery manager as
// they need to read the most updated config when receiving the new targets list.
scrapeManager.ApplyConfig,
func(cfg *config.Config) error {
c := make(map[string]discovery.Configs)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfigs
}
return discoveryManagerScrape.ApplyConfig(c)
},
}
prometheus.MustRegister(configSuccess)
prometheus.MustRegister(configSuccessTime)
// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}
var g run.Group
{
// Termination handler.
cancel := make(chan struct{})
g.Add(
func() error {
// Don't forget to release the reloadReady channel so that waiting blocks can exit normally.
select {
case <-shutDownChan:
level.Warn(logger).Log("msg", "Received ShutDown, exiting gracefully...")
reloadReady.Close()
case <-cancel:
reloadReady.Close()
break
}
return nil
},
func(err error) {
close(cancel)
},
)
}
{
// Scrape discovery manager.
g.Add(
func() error {
level.Info(logger).Log("msg", "Scrape discovery manager starting")
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped", "error", err)
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...", "error", err)
cancelScrape()
},
)
}
{
// Scrape manager.
g.Add(
func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
<-reloadReady.C
level.Info(logger).Log("msg", "start discovery")
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped", "error", err)
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...", "error", err)
scrapeManager.Stop()
},
)
}
{
// Target Allocator manager.
if taManager.enabled {
g.Add(
func() error {
// we wait until the config is fully loaded.
level.Info(logger).Log("msg", "start ta manager")
err := taManager.Run()
level.Info(logger).Log("msg", "ta manager stopped", "error", err)
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping ta manager...", "error", err)
taManager.Shutdown()
},
)
}
}
{
// Reload handler.
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
// long and synchronous tsdb init.
hup := make(chan os.Signal, 1)
signal.Notify(hup, syscall.SIGHUP)
cancel := make(chan struct{})
g.Add(
func() error {
<-reloadReady.C
for {
select {
case <-hup:
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err)
}
case <-cancel:
return nil
}
}
},
func(err error) {
// Wait for any in-progress reloads to complete to avoid
// reloading things after they have been shutdown.
cancel <- struct{}{}
},
)
}
{
// Initial configuration loading.
cancel := make(chan struct{})
g.Add(
func() error {
select {
// In case a shutdown is initiated before the dbOpen is released
case <-cancel:
reloadReady.Close()
return nil
default:
}
if taManager.enabled {
<-taManager.taReadyCh
}
level.Info(logger).Log("msg", "handling config file")
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
}
level.Info(logger).Log("msg", "finish handling config file")
reloadReady.Close()
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
}
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
}
level.Info(logger).Log("msg", "See you next time!")
wg.Done()
}