in cmd/rule-evaluator/main.go [101:482]
func main() {
ctx := context.Background()
logger := log.NewJSONLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
if !fips140.Enabled() {
_ = logger.Log("msg", "FIPS mode not enabled")
os.Exit(1)
}
a := kingpin.New("rule", "The Prometheus Rule Evaluator")
logLevel := a.Flag("log.level",
"The level of logging. Can be one of 'debug', 'info', 'warn', 'error'").Default(
"info").Enum("debug", "info", "warn", "error")
a.HelpFlag.Short('h')
var defaultProjectID string
if metadata.OnGCE() {
var err error
defaultProjectID, err = metadata.ProjectIDWithContext(ctx)
if err != nil {
_ = level.Warn(logger).Log("msg", "Unable to detect Google Cloud project", "err", err)
}
}
reg := prometheus.NewRegistry()
reg.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
grpc_prometheus.DefaultClientMetrics,
queryCounter,
queryHistogram,
)
// The rule-evaluator version is identical to the export library version for now, so
// we reuse that constant.
version, err := export.Version()
if err != nil {
_ = level.Error(logger).Log("msg", "Unable to fetch module version", "err", err)
os.Exit(1)
}
opts := exportsetup.Opts{
ExporterOpts: export.ExporterOpts{
UserAgentProduct: fmt.Sprintf("rule-evaluator/%s", version),
},
}
opts.SetupFlags(a)
defaultEvaluatorOpts := evaluatorOptions{
TargetURL: Must(url.Parse(fmt.Sprintf("https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus", projectIDVar))),
ProjectID: defaultProjectID,
DisableAuth: false,
ListenAddress: ":9091",
ConfigFile: "prometheus.yml",
QueueCapacity: 10000,
}
defaultEvaluatorOpts.setupFlags(a)
extraArgs, err := exportsetup.ExtraArgs()
if err != nil {
_ = level.Error(logger).Log("msg", "Error parsing commandline arguments", "err", err)
a.Usage(os.Args[1:])
os.Exit(2)
}
if _, err := a.Parse(append(os.Args[1:], extraArgs...)); err != nil {
_ = level.Error(logger).Log("msg", "Error parsing commandline arguments", "err", err)
a.Usage(os.Args[1:])
os.Exit(2)
}
switch strings.ToLower(*logLevel) {
case "debug":
logger = level.NewFilter(logger, level.AllowDebug())
case "warn":
logger = level.NewFilter(logger, level.AllowWarn())
case "error":
logger = level.NewFilter(logger, level.AllowError())
default:
logger = level.NewFilter(logger, level.AllowInfo())
}
if err := defaultEvaluatorOpts.validate(); err != nil {
_ = level.Error(logger).Log("msg", "invalid command line argument", "err", err)
os.Exit(1)
}
startTime := time.Now()
ctxExporter, cancelExporter := context.WithCancel(ctx)
exporter, err := opts.NewExporter(ctxExporter, logger, reg)
if err != nil {
_ = level.Error(logger).Log("msg", "Creating a Cloud Monitoring Exporter failed", "err", err)
os.Exit(1)
}
destination := export.NewStorage(exporter)
ctxDiscover, cancelDiscover := context.WithCancel(ctx)
discoveryManager := discovery.NewManager(ctxDiscover, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify"))
notifierOptions := notifier.Options{
Registerer: reg,
QueueCapacity: defaultEvaluatorOpts.QueueCapacity,
}
notificationManager := notifier.NewManager(¬ifierOptions, log.With(logger, "component", "notifier"))
rulesMetrics := rules.NewGroupMetrics(reg)
ruleEvaluator, err := newRuleEvaluator(ctx, logger, &defaultEvaluatorOpts, version, destination, notificationManager, rulesMetrics)
if err != nil {
_ = level.Error(logger).Log("msg", "Create rule-evaluator", "err", err)
os.Exit(1)
}
reloaders := []reloader{
{
name: "notify",
reloader: func(cfg *operator.RuleEvaluatorConfig) error {
return notificationManager.ApplyConfig(&cfg.Config)
},
}, {
name: "exporter",
reloader: func(cfg *operator.RuleEvaluatorConfig) error {
// Don't modify defaults. Copy defaults and modify based on config.
exporterOpts := opts.ExporterOpts
if cfg.GoogleCloud.Export != nil {
exportConfig := cfg.GoogleCloud.Export
if exportConfig.Compression != nil {
exporterOpts.Compression = *exportConfig.Compression
}
if exportConfig.Match != nil {
var selectors []labels.Selector
for _, match := range exportConfig.Match {
selector, err := parser.ParseMetricSelector(match)
if err != nil {
return fmt.Errorf("invalid metric matcher %q: %w", match, err)
}
selectors = append(selectors, selector)
}
exporterOpts.Matchers = selectors
}
if exportConfig.CredentialsFile != nil {
exporterOpts.CredentialsFile = *exportConfig.CredentialsFile
}
if err := exporterOpts.Validate(); err != nil {
return fmt.Errorf("unable to validate Google Cloud fields: %w", err)
}
}
return destination.ApplyConfig(&cfg.Config, &exporterOpts)
},
}, {
name: "notify_sd",
reloader: func(cfg *operator.RuleEvaluatorConfig) error {
c := make(map[string]discovery.Configs)
for k, v := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() {
c[k] = v.ServiceDiscoveryConfigs
}
return discoveryManager.ApplyConfig(c)
},
}, {
name: "rules",
reloader: func(cfg *operator.RuleEvaluatorConfig) error {
// Don't modify defaults. Copy defaults and modify based on config.
evaluatorOpts := defaultEvaluatorOpts
if cfg.GoogleCloud.Query != nil {
if cfg.GoogleCloud.Query.CredentialsFile != "" {
evaluatorOpts.CredentialsFile = cfg.GoogleCloud.Query.CredentialsFile
}
if cfg.GoogleCloud.Query.GeneratorURL != "" {
generatorURL, err := url.Parse(cfg.GoogleCloud.Query.GeneratorURL)
if err != nil {
return fmt.Errorf("unable to parse Google Cloud generator URL: %w", err)
}
evaluatorOpts.GeneratorURL = generatorURL
}
if cfg.GoogleCloud.Query.ProjectID != "" {
evaluatorOpts.ProjectID = cfg.GoogleCloud.Query.ProjectID
}
}
return ruleEvaluator.ApplyConfig(&cfg.Config, &evaluatorOpts)
},
},
}
configMetrics := newConfigMetrics(reg)
// Do an initial load of the configuration for all components.
if err := reloadConfig(defaultEvaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil {
_ = level.Error(logger).Log("msg", "error loading config file.", "err", err)
os.Exit(1)
}
var g run.Group
{
// Termination handler.
term := make(chan os.Signal, 1)
cancel := make(chan struct{})
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
g.Add(
func() error {
select {
case <-term:
_ = level.Info(logger).Log("msg", "received SIGTERM, exiting gracefully...")
case <-cancel:
}
return nil
},
func(error) {
close(cancel)
},
)
}
{
// Rule manager.
g.Add(func() error {
ruleEvaluator.Run()
return nil
}, func(error) {
ruleEvaluator.Stop()
})
}
{
// Notifier.
g.Add(func() error {
notificationManager.Run(discoveryManager.SyncCh())
_ = level.Info(logger).Log("msg", "Notification manager stopped")
return nil
},
func(error) {
notificationManager.Stop()
},
)
}
{
// Notify discovery manager.
g.Add(
func() error {
err := discoveryManager.Run()
_ = level.Info(logger).Log("msg", "Discovery manager stopped")
return err
},
func(error) {
_ = level.Info(logger).Log("msg", "Stopping Discovery manager...")
cancelDiscover()
},
)
}
{
// Storage Processing.
g.Add(func() error {
err = destination.Run()
_ = level.Info(logger).Log("msg", "Background processing of storage stopped")
return err
}, func(error) {
_ = level.Info(logger).Log("msg", "Stopping background storage processing...")
cancelExporter()
})
}
cwd, err := os.Getwd()
reloadCh := make(chan chan error)
{
// Web Server.
server := &http.Server{Addr: defaultEvaluatorOpts.ListenAddress}
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
http.HandleFunc("/-/reload", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
rc := make(chan error)
reloadCh <- rc
if err := <-rc; err != nil {
http.Error(w, fmt.Sprintf("Failed to reload config: %s", err), http.StatusInternalServerError)
}
} else {
http.Error(w, "Only POST requests allowed.", http.StatusMethodNotAllowed)
}
})
http.HandleFunc("/-/healthy", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
})
http.HandleFunc("/-/ready", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "rule-evaluator is Ready.\n")
})
// https://prometheus.io/docs/prometheus/latest/querying/api/#runtime-information
// Useful for knowing whether a config reload was successful.
http.HandleFunc("/api/v1/status/runtimeinfo", func(w http.ResponseWriter, _ *http.Request) {
runtimeInfo := apiv1.RuntimeInfo{
StartTime: startTime,
CWD: cwd,
GoroutineCount: runtime.NumGoroutine(),
GOMAXPROCS: runtime.GOMAXPROCS(0),
GOMEMLIMIT: debug.SetMemoryLimit(-1),
GOGC: os.Getenv("GOGC"),
GODEBUG: os.Getenv("GODEBUG"),
StorageRetention: "0d",
CorruptionCount: 0,
ReloadConfigSuccess: configMetrics.lastReloadSuccess,
LastConfigTime: configMetrics.lastReloadSuccessTime,
}
response := response{
Status: "success",
Data: runtimeInfo,
}
data, err := json.Marshal(response)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to marshal status: %s", err), http.StatusInternalServerError)
return
}
if _, err := w.Write(data); err != nil {
_ = level.Error(logger).Log("msg", "Unable to write runtime info status", "err", err)
}
})
// https://prometheus.io/docs/prometheus/latest/querying/api/#build-information
buildInfoHandler := promapi.BuildinfoHandlerFunc(log.With(logger, "handler", "buildinfo"), "rule-evaluator", version)
http.HandleFunc("/api/v1/status/buildinfo", buildInfoHandler)
// https://prometheus.io/docs/prometheus/latest/querying/api/#rules
apiHandler := internal.NewAPI(logger, ruleEvaluator.rulesManager)
http.HandleFunc("/api/v1/rules", apiHandler.HandleRulesEndpoint)
http.HandleFunc("/api/v1/rules/", http.NotFound)
// https://prometheus.io/docs/prometheus/latest/querying/api/#alerts
http.HandleFunc("/api/v1/alerts", apiHandler.HandleAlertsEndpoint)
g.Add(func() error {
_ = level.Info(logger).Log("msg", "Starting web server", "listen", defaultEvaluatorOpts.ListenAddress)
return server.ListenAndServe()
}, func(error) {
ctxServer, cancelServer := context.WithTimeout(ctx, time.Minute)
if err := server.Shutdown(ctxServer); err != nil {
_ = level.Error(logger).Log("msg", "Server failed to shut down gracefully.")
}
cancelServer()
})
}
{
// Reload handler.
hup := make(chan os.Signal, 1)
signal.Notify(hup, syscall.SIGHUP)
cancel := make(chan struct{})
g.Add(
func() error {
for {
select {
case <-hup:
if err := reloadConfig(defaultEvaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil {
_ = level.Error(logger).Log("msg", "Error reloading config", "err", err)
}
case rc := <-reloadCh:
if err := reloadConfig(defaultEvaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil {
_ = level.Error(logger).Log("msg", "Error reloading config", "err", err)
rc <- err
} else {
rc <- nil
}
case <-cancel:
return nil
}
}
},
func(error) {
// Wait for any in-progress reloads to complete to avoid
// reloading things after they have been shutdown.
cancel <- struct{}{}
},
)
}
// Run a test query to check status of rule evaluator.
_, err = ruleEvaluator.Query(ctx, "vector(1)", time.Now())
if err != nil {
_ = level.Error(logger).Log("msg", "Error querying Prometheus instance", "err", err)
}
if err := g.Run(); err != nil {
_ = level.Error(logger).Log("msg", "Running rule evaluator failed", "err", err)
os.Exit(1)
}
}