func Start()

in plugins/inputs/prometheus/start.go [83:312]


func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan interface{}, wg *sync.WaitGroup, mth *metricsTypeHandler) {
	logLevel := &promlog.AllowedLevel{}
	logLevel.Set("info")

	if os.Getenv("DEBUG") != "" {
		runtime.SetBlockProfileRate(20)
		runtime.SetMutexProfileFraction(20)
		logLevel.Set("debug")
	}
	logFormat := &promlog.AllowedFormat{}
	_ = logFormat.Set("logfmt")

	cfg := struct {
		configFile    string
		promlogConfig promlog.Config
	}{
		promlogConfig: promlog.Config{Level: logLevel, Format: logFormat},
	}

	cfg.configFile = configFilePath

	logger := promlog.New(&cfg.promlogConfig)

	klog.SetLogger(klogr.New().WithName("k8s_client_runtime").V(6))

	level.Info(logger).Log("msg", "Starting Prometheus", "version", version.Info())
	level.Info(logger).Log("build_context", version.BuildContext())
	level.Info(logger).Log("host_details", promRuntime.Uname())
	level.Info(logger).Log("fd_limits", promRuntime.FdLimits())
	level.Info(logger).Log("vm_limits", promRuntime.VMLimits())

	var (
		ctxScrape, cancelScrape = context.WithCancel(context.Background())
		sdMetrics, _            = discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
		discoveryManagerScrape  = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))

		scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer)
		taManager        = createTargetAllocatorManager(configFilePath, log.With(logger, "component", "ta manager"), logLevel, scrapeManager, discoveryManagerScrape)
	)

	level.Info(logger).Log("msg", fmt.Sprintf("Target Allocator  is %t", taManager.enabled))
	//Setup Target Allocator Scrape Post Process Handler
	taManager.AttachReloadConfigHandler(
		func(prometheusConfig *config.Config) {
			relabelScrapeConfigs(prometheusConfig, logger)
		},
	)

	mth.SetScrapeManager(scrapeManager)

	var reloaders = []func(cfg *config.Config) error{
		// The Scrape and notifier managers need to reload before the Discovery manager as
		// they need to read the most updated config when receiving the new targets list.
		scrapeManager.ApplyConfig,
		func(cfg *config.Config) error {
			c := make(map[string]discovery.Configs)
			for _, v := range cfg.ScrapeConfigs {
				c[v.JobName] = v.ServiceDiscoveryConfigs
			}
			return discoveryManagerScrape.ApplyConfig(c)
		},
	}

	prometheus.MustRegister(configSuccess)
	prometheus.MustRegister(configSuccessTime)

	// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
	type closeOnce struct {
		C     chan struct{}
		once  sync.Once
		Close func()
	}
	// Wait until the server is ready to handle reloading.
	reloadReady := &closeOnce{
		C: make(chan struct{}),
	}
	reloadReady.Close = func() {
		reloadReady.once.Do(func() {
			close(reloadReady.C)
		})
	}
	var g run.Group
	{
		// Termination handler.
		cancel := make(chan struct{})
		g.Add(
			func() error {
				// Don't forget to release the reloadReady channel so that waiting blocks can exit normally.
				select {
				case <-shutDownChan:
					level.Warn(logger).Log("msg", "Received ShutDown, exiting gracefully...")
					reloadReady.Close()

				case <-cancel:
					reloadReady.Close()
					break
				}
				return nil
			},
			func(err error) {
				close(cancel)
			},
		)
	}
	{
		// Scrape discovery manager.
		g.Add(
			func() error {
				level.Info(logger).Log("msg", "Scrape discovery manager  starting")
				err := discoveryManagerScrape.Run()
				level.Info(logger).Log("msg", "Scrape discovery manager stopped", "error", err)
				return err
			},
			func(err error) {
				level.Info(logger).Log("msg", "Stopping scrape discovery manager...", "error", err)
				cancelScrape()
			},
		)
	}
	{
		// Scrape manager.
		g.Add(
			func() error {
				// When the scrape manager receives a new targets list
				// it needs to read a valid config for each job.
				// It depends on the config being in sync with the discovery manager so
				// we wait until the config is fully loaded.
				<-reloadReady.C

				level.Info(logger).Log("msg", "start discovery")
				err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
				level.Info(logger).Log("msg", "Scrape manager stopped", "error", err)
				return err
			},
			func(err error) {
				// Scrape manager needs to be stopped before closing the local TSDB
				// so that it doesn't try to write samples to a closed storage.
				level.Info(logger).Log("msg", "Stopping scrape manager...", "error", err)
				scrapeManager.Stop()
			},
		)
	}
	{
		// Target Allocator  manager.
		if taManager.enabled {
			g.Add(
				func() error {
					// we wait until the config is fully loaded.
					level.Info(logger).Log("msg", "start ta manager")
					err := taManager.Run()
					level.Info(logger).Log("msg", "ta manager stopped", "error", err)
					return err
				},
				func(err error) {
					level.Info(logger).Log("msg", "Stopping ta manager...", "error", err)
					taManager.Shutdown()
				},
			)
		}
	}
	{
		// Reload handler.

		// Make sure that sighup handler is registered with a redirect to the channel before the potentially
		// long and synchronous tsdb init.
		hup := make(chan os.Signal, 1)
		signal.Notify(hup, syscall.SIGHUP)
		cancel := make(chan struct{})
		g.Add(
			func() error {
				<-reloadReady.C

				for {
					select {
					case <-hup:
						if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
							level.Error(logger).Log("msg", "Error reloading config", "err", err)
						}

					case <-cancel:
						return nil
					}
				}

			},
			func(err error) {
				// Wait for any in-progress reloads to complete to avoid
				// reloading things after they have been shutdown.
				cancel <- struct{}{}
			},
		)
	}
	{
		// Initial configuration loading.
		cancel := make(chan struct{})
		g.Add(
			func() error {
				select {
				// In case a shutdown is initiated before the dbOpen is released
				case <-cancel:
					reloadReady.Close()
					return nil

				default:
				}
				if taManager.enabled {
					<-taManager.taReadyCh
				}
				level.Info(logger).Log("msg", "handling config file")
				if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
					return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
				}
				level.Info(logger).Log("msg", "finish handling config file")

				reloadReady.Close()
				<-cancel
				return nil
			},
			func(err error) {
				close(cancel)
			},
		)
	}

	if err := g.Run(); err != nil {
		level.Error(logger).Log("err", err)
	}
	level.Info(logger).Log("msg", "See you next time!")
	wg.Done()
}