func main()

in cmd/rule-evaluator/main.go [101:482]


func main() {
	ctx := context.Background()

	logger := log.NewJSONLogger(log.NewSyncWriter(os.Stderr))
	logger = log.With(logger, "ts", log.DefaultTimestampUTC)
	logger = log.With(logger, "caller", log.DefaultCaller)

	if !fips140.Enabled() {
		_ = logger.Log("msg", "FIPS mode not enabled")
		os.Exit(1)
	}

	a := kingpin.New("rule", "The Prometheus Rule Evaluator")
	logLevel := a.Flag("log.level",
		"The level of logging. Can be one of 'debug', 'info', 'warn', 'error'").Default(
		"info").Enum("debug", "info", "warn", "error")

	a.HelpFlag.Short('h')

	var defaultProjectID string
	if metadata.OnGCE() {
		var err error
		defaultProjectID, err = metadata.ProjectIDWithContext(ctx)
		if err != nil {
			_ = level.Warn(logger).Log("msg", "Unable to detect Google Cloud project", "err", err)
		}
	}

	reg := prometheus.NewRegistry()
	reg.MustRegister(
		collectors.NewGoCollector(),
		collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
		grpc_prometheus.DefaultClientMetrics,
		queryCounter,
		queryHistogram,
	)

	// The rule-evaluator version is identical to the export library version for now, so
	// we reuse that constant.
	version, err := export.Version()
	if err != nil {
		_ = level.Error(logger).Log("msg", "Unable to fetch module version", "err", err)
		os.Exit(1)
	}

	opts := exportsetup.Opts{
		ExporterOpts: export.ExporterOpts{
			UserAgentProduct: fmt.Sprintf("rule-evaluator/%s", version),
		},
	}
	opts.SetupFlags(a)

	defaultEvaluatorOpts := evaluatorOptions{
		TargetURL:     Must(url.Parse(fmt.Sprintf("https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus", projectIDVar))),
		ProjectID:     defaultProjectID,
		DisableAuth:   false,
		ListenAddress: ":9091",
		ConfigFile:    "prometheus.yml",
		QueueCapacity: 10000,
	}
	defaultEvaluatorOpts.setupFlags(a)

	extraArgs, err := exportsetup.ExtraArgs()
	if err != nil {
		_ = level.Error(logger).Log("msg", "Error parsing commandline arguments", "err", err)
		a.Usage(os.Args[1:])
		os.Exit(2)
	}
	if _, err := a.Parse(append(os.Args[1:], extraArgs...)); err != nil {
		_ = level.Error(logger).Log("msg", "Error parsing commandline arguments", "err", err)
		a.Usage(os.Args[1:])
		os.Exit(2)
	}
	switch strings.ToLower(*logLevel) {
	case "debug":
		logger = level.NewFilter(logger, level.AllowDebug())
	case "warn":
		logger = level.NewFilter(logger, level.AllowWarn())
	case "error":
		logger = level.NewFilter(logger, level.AllowError())
	default:
		logger = level.NewFilter(logger, level.AllowInfo())
	}

	if err := defaultEvaluatorOpts.validate(); err != nil {
		_ = level.Error(logger).Log("msg", "invalid command line argument", "err", err)
		os.Exit(1)
	}

	startTime := time.Now()

	ctxExporter, cancelExporter := context.WithCancel(ctx)
	exporter, err := opts.NewExporter(ctxExporter, logger, reg)
	if err != nil {
		_ = level.Error(logger).Log("msg", "Creating a Cloud Monitoring Exporter failed", "err", err)
		os.Exit(1)
	}
	destination := export.NewStorage(exporter)

	ctxDiscover, cancelDiscover := context.WithCancel(ctx)
	discoveryManager := discovery.NewManager(ctxDiscover, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify"))
	notifierOptions := notifier.Options{
		Registerer:    reg,
		QueueCapacity: defaultEvaluatorOpts.QueueCapacity,
	}
	notificationManager := notifier.NewManager(&notifierOptions, log.With(logger, "component", "notifier"))
	rulesMetrics := rules.NewGroupMetrics(reg)
	ruleEvaluator, err := newRuleEvaluator(ctx, logger, &defaultEvaluatorOpts, version, destination, notificationManager, rulesMetrics)
	if err != nil {
		_ = level.Error(logger).Log("msg", "Create rule-evaluator", "err", err)
		os.Exit(1)
	}

	reloaders := []reloader{
		{
			name: "notify",
			reloader: func(cfg *operator.RuleEvaluatorConfig) error {
				return notificationManager.ApplyConfig(&cfg.Config)
			},
		}, {
			name: "exporter",
			reloader: func(cfg *operator.RuleEvaluatorConfig) error {
				// Don't modify defaults. Copy defaults and modify based on config.
				exporterOpts := opts.ExporterOpts
				if cfg.GoogleCloud.Export != nil {
					exportConfig := cfg.GoogleCloud.Export
					if exportConfig.Compression != nil {
						exporterOpts.Compression = *exportConfig.Compression
					}
					if exportConfig.Match != nil {
						var selectors []labels.Selector
						for _, match := range exportConfig.Match {
							selector, err := parser.ParseMetricSelector(match)
							if err != nil {
								return fmt.Errorf("invalid metric matcher %q: %w", match, err)
							}
							selectors = append(selectors, selector)
						}
						exporterOpts.Matchers = selectors
					}
					if exportConfig.CredentialsFile != nil {
						exporterOpts.CredentialsFile = *exportConfig.CredentialsFile
					}

					if err := exporterOpts.Validate(); err != nil {
						return fmt.Errorf("unable to validate Google Cloud fields: %w", err)
					}
				}

				return destination.ApplyConfig(&cfg.Config, &exporterOpts)
			},
		}, {
			name: "notify_sd",
			reloader: func(cfg *operator.RuleEvaluatorConfig) error {
				c := make(map[string]discovery.Configs)
				for k, v := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() {
					c[k] = v.ServiceDiscoveryConfigs
				}
				return discoveryManager.ApplyConfig(c)
			},
		}, {
			name: "rules",
			reloader: func(cfg *operator.RuleEvaluatorConfig) error {
				// Don't modify defaults. Copy defaults and modify based on config.
				evaluatorOpts := defaultEvaluatorOpts
				if cfg.GoogleCloud.Query != nil {
					if cfg.GoogleCloud.Query.CredentialsFile != "" {
						evaluatorOpts.CredentialsFile = cfg.GoogleCloud.Query.CredentialsFile
					}
					if cfg.GoogleCloud.Query.GeneratorURL != "" {
						generatorURL, err := url.Parse(cfg.GoogleCloud.Query.GeneratorURL)
						if err != nil {
							return fmt.Errorf("unable to parse Google Cloud generator URL: %w", err)
						}
						evaluatorOpts.GeneratorURL = generatorURL
					}
					if cfg.GoogleCloud.Query.ProjectID != "" {
						evaluatorOpts.ProjectID = cfg.GoogleCloud.Query.ProjectID
					}
				}
				return ruleEvaluator.ApplyConfig(&cfg.Config, &evaluatorOpts)
			},
		},
	}

	configMetrics := newConfigMetrics(reg)

	// Do an initial load of the configuration for all components.
	if err := reloadConfig(defaultEvaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil {
		_ = level.Error(logger).Log("msg", "error loading config file.", "err", err)
		os.Exit(1)
	}

	var g run.Group
	{
		// Termination handler.
		term := make(chan os.Signal, 1)
		cancel := make(chan struct{})
		signal.Notify(term, os.Interrupt, syscall.SIGTERM)
		g.Add(
			func() error {
				select {
				case <-term:
					_ = level.Info(logger).Log("msg", "received SIGTERM, exiting gracefully...")
				case <-cancel:
				}
				return nil
			},
			func(error) {
				close(cancel)
			},
		)
	}
	{
		// Rule manager.
		g.Add(func() error {
			ruleEvaluator.Run()
			return nil
		}, func(error) {
			ruleEvaluator.Stop()
		})
	}
	{
		// Notifier.
		g.Add(func() error {
			notificationManager.Run(discoveryManager.SyncCh())
			_ = level.Info(logger).Log("msg", "Notification manager stopped")
			return nil
		},
			func(error) {
				notificationManager.Stop()
			},
		)
	}
	{
		// Notify discovery manager.
		g.Add(
			func() error {
				err := discoveryManager.Run()
				_ = level.Info(logger).Log("msg", "Discovery manager stopped")
				return err
			},
			func(error) {
				_ = level.Info(logger).Log("msg", "Stopping Discovery manager...")
				cancelDiscover()
			},
		)
	}
	{
		// Storage Processing.
		g.Add(func() error {
			err = destination.Run()
			_ = level.Info(logger).Log("msg", "Background processing of storage stopped")
			return err
		}, func(error) {
			_ = level.Info(logger).Log("msg", "Stopping background storage processing...")
			cancelExporter()
		})
	}
	cwd, err := os.Getwd()
	reloadCh := make(chan chan error)
	{
		// Web Server.
		server := &http.Server{Addr: defaultEvaluatorOpts.ListenAddress}

		http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
		http.HandleFunc("/-/reload", func(w http.ResponseWriter, r *http.Request) {
			if r.Method == http.MethodPost {
				rc := make(chan error)
				reloadCh <- rc
				if err := <-rc; err != nil {
					http.Error(w, fmt.Sprintf("Failed to reload config: %s", err), http.StatusInternalServerError)
				}
			} else {
				http.Error(w, "Only POST requests allowed.", http.StatusMethodNotAllowed)
			}
		})
		http.HandleFunc("/-/healthy", func(w http.ResponseWriter, _ *http.Request) {
			w.WriteHeader(http.StatusOK)
		})
		http.HandleFunc("/-/ready", func(w http.ResponseWriter, _ *http.Request) {
			w.WriteHeader(http.StatusOK)
			fmt.Fprintf(w, "rule-evaluator is Ready.\n")
		})
		// https://prometheus.io/docs/prometheus/latest/querying/api/#runtime-information
		// Useful for knowing whether a config reload was successful.
		http.HandleFunc("/api/v1/status/runtimeinfo", func(w http.ResponseWriter, _ *http.Request) {
			runtimeInfo := apiv1.RuntimeInfo{
				StartTime:           startTime,
				CWD:                 cwd,
				GoroutineCount:      runtime.NumGoroutine(),
				GOMAXPROCS:          runtime.GOMAXPROCS(0),
				GOMEMLIMIT:          debug.SetMemoryLimit(-1),
				GOGC:                os.Getenv("GOGC"),
				GODEBUG:             os.Getenv("GODEBUG"),
				StorageRetention:    "0d",
				CorruptionCount:     0,
				ReloadConfigSuccess: configMetrics.lastReloadSuccess,
				LastConfigTime:      configMetrics.lastReloadSuccessTime,
			}
			response := response{
				Status: "success",
				Data:   runtimeInfo,
			}
			data, err := json.Marshal(response)
			if err != nil {
				http.Error(w, fmt.Sprintf("Failed to marshal status: %s", err), http.StatusInternalServerError)
				return
			}

			if _, err := w.Write(data); err != nil {
				_ = level.Error(logger).Log("msg", "Unable to write runtime info status", "err", err)
			}
		})

		// https://prometheus.io/docs/prometheus/latest/querying/api/#build-information
		buildInfoHandler := promapi.BuildinfoHandlerFunc(log.With(logger, "handler", "buildinfo"), "rule-evaluator", version)
		http.HandleFunc("/api/v1/status/buildinfo", buildInfoHandler)

		// https://prometheus.io/docs/prometheus/latest/querying/api/#rules
		apiHandler := internal.NewAPI(logger, ruleEvaluator.rulesManager)
		http.HandleFunc("/api/v1/rules", apiHandler.HandleRulesEndpoint)
		http.HandleFunc("/api/v1/rules/", http.NotFound)

		// https://prometheus.io/docs/prometheus/latest/querying/api/#alerts
		http.HandleFunc("/api/v1/alerts", apiHandler.HandleAlertsEndpoint)

		g.Add(func() error {
			_ = level.Info(logger).Log("msg", "Starting web server", "listen", defaultEvaluatorOpts.ListenAddress)
			return server.ListenAndServe()
		}, func(error) {
			ctxServer, cancelServer := context.WithTimeout(ctx, time.Minute)
			if err := server.Shutdown(ctxServer); err != nil {
				_ = level.Error(logger).Log("msg", "Server failed to shut down gracefully.")
			}
			cancelServer()
		})
	}
	{
		// Reload handler.
		hup := make(chan os.Signal, 1)
		signal.Notify(hup, syscall.SIGHUP)
		cancel := make(chan struct{})
		g.Add(
			func() error {
				for {
					select {
					case <-hup:
						if err := reloadConfig(defaultEvaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil {
							_ = level.Error(logger).Log("msg", "Error reloading config", "err", err)
						}
					case rc := <-reloadCh:
						if err := reloadConfig(defaultEvaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil {
							_ = level.Error(logger).Log("msg", "Error reloading config", "err", err)
							rc <- err
						} else {
							rc <- nil
						}
					case <-cancel:
						return nil
					}
				}
			},
			func(error) {
				// Wait for any in-progress reloads to complete to avoid
				// reloading things after they have been shutdown.
				cancel <- struct{}{}
			},
		)
	}

	// Run a test query to check status of rule evaluator.
	_, err = ruleEvaluator.Query(ctx, "vector(1)", time.Now())
	if err != nil {
		_ = level.Error(logger).Log("msg", "Error querying Prometheus instance", "err", err)
	}

	if err := g.Run(); err != nil {
		_ = level.Error(logger).Log("msg", "Running rule evaluator failed", "err", err)
		os.Exit(1)
	}
}