cmd/rule-evaluator/main.go (920 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "crypto/fips140" "encoding/json" "errors" "fmt" "net/http" "net/url" "os" "os/signal" "path/filepath" "reflect" "runtime" "runtime/debug" "strconv" "strings" "sync" "syscall" "time" "cloud.google.com/go/compute/metadata" "github.com/GoogleCloudPlatform/prometheus-engine/cmd/rule-evaluator/internal" "github.com/GoogleCloudPlatform/prometheus-engine/internal/promapi" "github.com/GoogleCloudPlatform/prometheus-engine/pkg/export" exportsetup "github.com/GoogleCloudPlatform/prometheus-engine/pkg/export/setup" "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator" "github.com/alecthomas/kingpin/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" apiv1 "github.com/prometheus/prometheus/web/api/v1" "google.golang.org/api/option" apihttp "google.golang.org/api/transport/http" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "gopkg.in/yaml.v3" "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" // Import to enable 'kubernetes_sd_configs' to SD config register. _ "github.com/prometheus/prometheus/discovery/kubernetes" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/strutil" ) const projectIDVar = "PROJECT_ID" var ( googleCloudBaseURL = url.URL{ Scheme: "https", Host: "console.cloud.google.com", Path: "/monitoring/metrics-explorer", } queryCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "rule_evaluator_query_requests_total", Help: "A counter for query requests sent to GCM.", }, []string{"code", "method"}, ) queryHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "rule_evaluator_query_requests_latency_seconds", Help: "Histogram of response latency of query requests sent to GCM.", Buckets: prometheus.DefBuckets, }, []string{"code", "method"}, ) ) func main() { ctx := context.Background() logger := log.NewJSONLogger(log.NewSyncWriter(os.Stderr)) logger = log.With(logger, "ts", log.DefaultTimestampUTC) logger = log.With(logger, "caller", log.DefaultCaller) if !fips140.Enabled() { _ = logger.Log("msg", "FIPS mode not enabled") os.Exit(1) } a := kingpin.New("rule", "The Prometheus Rule Evaluator") logLevel := a.Flag("log.level", "The level of logging. Can be one of 'debug', 'info', 'warn', 'error'").Default( "info").Enum("debug", "info", "warn", "error") a.HelpFlag.Short('h') var defaultProjectID string if metadata.OnGCE() { var err error defaultProjectID, err = metadata.ProjectIDWithContext(ctx) if err != nil { _ = level.Warn(logger).Log("msg", "Unable to detect Google Cloud project", "err", err) } } reg := prometheus.NewRegistry() reg.MustRegister( collectors.NewGoCollector(), collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), grpc_prometheus.DefaultClientMetrics, queryCounter, queryHistogram, ) // The rule-evaluator version is identical to the export library version for now, so // we reuse that constant. version, err := export.Version() if err != nil { _ = level.Error(logger).Log("msg", "Unable to fetch module version", "err", err) os.Exit(1) } opts := exportsetup.Opts{ ExporterOpts: export.ExporterOpts{ UserAgentProduct: fmt.Sprintf("rule-evaluator/%s", version), }, } opts.SetupFlags(a) defaultEvaluatorOpts := evaluatorOptions{ TargetURL: Must(url.Parse(fmt.Sprintf("https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus", projectIDVar))), ProjectID: defaultProjectID, DisableAuth: false, ListenAddress: ":9091", ConfigFile: "prometheus.yml", QueueCapacity: 10000, } defaultEvaluatorOpts.setupFlags(a) extraArgs, err := exportsetup.ExtraArgs() if err != nil { _ = level.Error(logger).Log("msg", "Error parsing commandline arguments", "err", err) a.Usage(os.Args[1:]) os.Exit(2) } if _, err := a.Parse(append(os.Args[1:], extraArgs...)); err != nil { _ = level.Error(logger).Log("msg", "Error parsing commandline arguments", "err", err) a.Usage(os.Args[1:]) os.Exit(2) } switch strings.ToLower(*logLevel) { case "debug": logger = level.NewFilter(logger, level.AllowDebug()) case "warn": logger = level.NewFilter(logger, level.AllowWarn()) case "error": logger = level.NewFilter(logger, level.AllowError()) default: logger = level.NewFilter(logger, level.AllowInfo()) } if err := defaultEvaluatorOpts.validate(); err != nil { _ = level.Error(logger).Log("msg", "invalid command line argument", "err", err) os.Exit(1) } startTime := time.Now() ctxExporter, cancelExporter := context.WithCancel(ctx) exporter, err := opts.NewExporter(ctxExporter, logger, reg) if err != nil { _ = level.Error(logger).Log("msg", "Creating a Cloud Monitoring Exporter failed", "err", err) os.Exit(1) } destination := export.NewStorage(exporter) ctxDiscover, cancelDiscover := context.WithCancel(ctx) discoveryManager := discovery.NewManager(ctxDiscover, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify")) notifierOptions := notifier.Options{ Registerer: reg, QueueCapacity: defaultEvaluatorOpts.QueueCapacity, } notificationManager := notifier.NewManager(&notifierOptions, log.With(logger, "component", "notifier")) rulesMetrics := rules.NewGroupMetrics(reg) ruleEvaluator, err := newRuleEvaluator(ctx, logger, &defaultEvaluatorOpts, version, destination, notificationManager, rulesMetrics) if err != nil { _ = level.Error(logger).Log("msg", "Create rule-evaluator", "err", err) os.Exit(1) } reloaders := []reloader{ { name: "notify", reloader: func(cfg *operator.RuleEvaluatorConfig) error { return notificationManager.ApplyConfig(&cfg.Config) }, }, { name: "exporter", reloader: func(cfg *operator.RuleEvaluatorConfig) error { // Don't modify defaults. Copy defaults and modify based on config. exporterOpts := opts.ExporterOpts if cfg.GoogleCloud.Export != nil { exportConfig := cfg.GoogleCloud.Export if exportConfig.Compression != nil { exporterOpts.Compression = *exportConfig.Compression } if exportConfig.Match != nil { var selectors []labels.Selector for _, match := range exportConfig.Match { selector, err := parser.ParseMetricSelector(match) if err != nil { return fmt.Errorf("invalid metric matcher %q: %w", match, err) } selectors = append(selectors, selector) } exporterOpts.Matchers = selectors } if exportConfig.CredentialsFile != nil { exporterOpts.CredentialsFile = *exportConfig.CredentialsFile } if err := exporterOpts.Validate(); err != nil { return fmt.Errorf("unable to validate Google Cloud fields: %w", err) } } return destination.ApplyConfig(&cfg.Config, &exporterOpts) }, }, { name: "notify_sd", reloader: func(cfg *operator.RuleEvaluatorConfig) error { c := make(map[string]discovery.Configs) for k, v := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() { c[k] = v.ServiceDiscoveryConfigs } return discoveryManager.ApplyConfig(c) }, }, { name: "rules", reloader: func(cfg *operator.RuleEvaluatorConfig) error { // Don't modify defaults. Copy defaults and modify based on config. evaluatorOpts := defaultEvaluatorOpts if cfg.GoogleCloud.Query != nil { if cfg.GoogleCloud.Query.CredentialsFile != "" { evaluatorOpts.CredentialsFile = cfg.GoogleCloud.Query.CredentialsFile } if cfg.GoogleCloud.Query.GeneratorURL != "" { generatorURL, err := url.Parse(cfg.GoogleCloud.Query.GeneratorURL) if err != nil { return fmt.Errorf("unable to parse Google Cloud generator URL: %w", err) } evaluatorOpts.GeneratorURL = generatorURL } if cfg.GoogleCloud.Query.ProjectID != "" { evaluatorOpts.ProjectID = cfg.GoogleCloud.Query.ProjectID } } return ruleEvaluator.ApplyConfig(&cfg.Config, &evaluatorOpts) }, }, } configMetrics := newConfigMetrics(reg) // Do an initial load of the configuration for all components. if err := reloadConfig(defaultEvaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil { _ = level.Error(logger).Log("msg", "error loading config file.", "err", err) os.Exit(1) } var g run.Group { // Termination handler. term := make(chan os.Signal, 1) cancel := make(chan struct{}) signal.Notify(term, os.Interrupt, syscall.SIGTERM) g.Add( func() error { select { case <-term: _ = level.Info(logger).Log("msg", "received SIGTERM, exiting gracefully...") case <-cancel: } return nil }, func(error) { close(cancel) }, ) } { // Rule manager. g.Add(func() error { ruleEvaluator.Run() return nil }, func(error) { ruleEvaluator.Stop() }) } { // Notifier. g.Add(func() error { notificationManager.Run(discoveryManager.SyncCh()) _ = level.Info(logger).Log("msg", "Notification manager stopped") return nil }, func(error) { notificationManager.Stop() }, ) } { // Notify discovery manager. g.Add( func() error { err := discoveryManager.Run() _ = level.Info(logger).Log("msg", "Discovery manager stopped") return err }, func(error) { _ = level.Info(logger).Log("msg", "Stopping Discovery manager...") cancelDiscover() }, ) } { // Storage Processing. g.Add(func() error { err = destination.Run() _ = level.Info(logger).Log("msg", "Background processing of storage stopped") return err }, func(error) { _ = level.Info(logger).Log("msg", "Stopping background storage processing...") cancelExporter() }) } cwd, err := os.Getwd() reloadCh := make(chan chan error) { // Web Server. server := &http.Server{Addr: defaultEvaluatorOpts.ListenAddress} http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) http.HandleFunc("/-/reload", func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodPost { rc := make(chan error) reloadCh <- rc if err := <-rc; err != nil { http.Error(w, fmt.Sprintf("Failed to reload config: %s", err), http.StatusInternalServerError) } } else { http.Error(w, "Only POST requests allowed.", http.StatusMethodNotAllowed) } }) http.HandleFunc("/-/healthy", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) }) http.HandleFunc("/-/ready", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "rule-evaluator is Ready.\n") }) // https://prometheus.io/docs/prometheus/latest/querying/api/#runtime-information // Useful for knowing whether a config reload was successful. http.HandleFunc("/api/v1/status/runtimeinfo", func(w http.ResponseWriter, _ *http.Request) { runtimeInfo := apiv1.RuntimeInfo{ StartTime: startTime, CWD: cwd, GoroutineCount: runtime.NumGoroutine(), GOMAXPROCS: runtime.GOMAXPROCS(0), GOMEMLIMIT: debug.SetMemoryLimit(-1), GOGC: os.Getenv("GOGC"), GODEBUG: os.Getenv("GODEBUG"), StorageRetention: "0d", CorruptionCount: 0, ReloadConfigSuccess: configMetrics.lastReloadSuccess, LastConfigTime: configMetrics.lastReloadSuccessTime, } response := response{ Status: "success", Data: runtimeInfo, } data, err := json.Marshal(response) if err != nil { http.Error(w, fmt.Sprintf("Failed to marshal status: %s", err), http.StatusInternalServerError) return } if _, err := w.Write(data); err != nil { _ = level.Error(logger).Log("msg", "Unable to write runtime info status", "err", err) } }) // https://prometheus.io/docs/prometheus/latest/querying/api/#build-information buildInfoHandler := promapi.BuildinfoHandlerFunc(log.With(logger, "handler", "buildinfo"), "rule-evaluator", version) http.HandleFunc("/api/v1/status/buildinfo", buildInfoHandler) // https://prometheus.io/docs/prometheus/latest/querying/api/#rules apiHandler := internal.NewAPI(logger, ruleEvaluator.rulesManager) http.HandleFunc("/api/v1/rules", apiHandler.HandleRulesEndpoint) http.HandleFunc("/api/v1/rules/", http.NotFound) // https://prometheus.io/docs/prometheus/latest/querying/api/#alerts http.HandleFunc("/api/v1/alerts", apiHandler.HandleAlertsEndpoint) g.Add(func() error { _ = level.Info(logger).Log("msg", "Starting web server", "listen", defaultEvaluatorOpts.ListenAddress) return server.ListenAndServe() }, func(error) { ctxServer, cancelServer := context.WithTimeout(ctx, time.Minute) if err := server.Shutdown(ctxServer); err != nil { _ = level.Error(logger).Log("msg", "Server failed to shut down gracefully.") } cancelServer() }) } { // Reload handler. hup := make(chan os.Signal, 1) signal.Notify(hup, syscall.SIGHUP) cancel := make(chan struct{}) g.Add( func() error { for { select { case <-hup: if err := reloadConfig(defaultEvaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil { _ = level.Error(logger).Log("msg", "Error reloading config", "err", err) } case rc := <-reloadCh: if err := reloadConfig(defaultEvaluatorOpts.ConfigFile, logger, configMetrics, reloaders...); err != nil { _ = level.Error(logger).Log("msg", "Error reloading config", "err", err) rc <- err } else { rc <- nil } case <-cancel: return nil } } }, func(error) { // Wait for any in-progress reloads to complete to avoid // reloading things after they have been shutdown. cancel <- struct{}{} }, ) } // Run a test query to check status of rule evaluator. _, err = ruleEvaluator.Query(ctx, "vector(1)", time.Now()) if err != nil { _ = level.Error(logger).Log("msg", "Error querying Prometheus instance", "err", err) } if err := g.Run(); err != nil { _ = level.Error(logger).Log("msg", "Running rule evaluator failed", "err", err) os.Exit(1) } } // Must panics if there's any error. func Must[T any](value T, err error) T { if err != nil { panic(err) } return value } type evaluatorOptions struct { TargetURL *url.URL ProjectID string GeneratorURL *url.URL CredentialsFile string DisableAuth bool ListenAddress string ConfigFile string QueueCapacity int } func (opts *evaluatorOptions) setupFlags(a *kingpin.Application) { a.Flag("query.project-id", "Project ID of the Google Cloud Monitoring scoping project to evaluate rules against."). Default(opts.ProjectID). StringVar(&opts.ProjectID) a.Flag("query.target-url", fmt.Sprintf("The address of the Prometheus server query endpoint. (%s is replaced with the --query.project-id flag.)", projectIDVar)). Default(opts.TargetURL.String()). URLVar(&opts.TargetURL) a.Flag("query.generator-url", "The base URL used for the generator URL in the alert notification payload. Should point to an instance of a query frontend that accesses the same data as --query.target-url."). Default(googleCloudBaseURL.String()). URLVar(&opts.GeneratorURL) a.Flag("query.credentials-file", "Credentials file for OAuth2 authentication with --query.target-url."). PlaceHolder("<FILE>"). StringVar(&opts.CredentialsFile) a.Flag("query.debug.disable-auth", "Disable authentication (for debugging purposes)."). Default("false"). BoolVar(&opts.DisableAuth) a.Flag("web.listen-address", "The address to listen on for HTTP requests."). Default(":9091"). StringVar(&opts.ListenAddress) a.Flag("config.file", "Prometheus configuration file path."). Default(opts.ConfigFile). StringVar(&opts.ConfigFile) a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). Default(strconv.Itoa(opts.QueueCapacity)). IntVar(&opts.QueueCapacity) } func (opts *evaluatorOptions) validate() error { contents, err := os.ReadFile(opts.ConfigFile) if err != nil { return fmt.Errorf("read config %q: %w", opts.ConfigFile, err) } cfg, err := loadConfig(contents) if err != nil { return fmt.Errorf("load config %q: %w", opts.ConfigFile, err) } if opts.ProjectID == "" && cfg.GoogleCloud.Query != nil { opts.ProjectID = cfg.GoogleCloud.Query.ProjectID } // Pass a placeholder project ID value "x" to ensure the URL replacement is valid. if _, err := url.Parse(strings.ReplaceAll(opts.TargetURL.String(), projectIDVar, "x")); err != nil { return fmt.Errorf("unable to parse --query.target-url value %q: %w", opts.TargetURL.String(), err) } return nil } func newAPI(ctx context.Context, opts *evaluatorOptions, version string) (v1.API, error) { clientOpts := []option.ClientOption{ option.WithScopes("https://www.googleapis.com/auth/monitoring.read"), option.WithUserAgent(fmt.Sprintf("rule-evaluator/%s", version)), } if opts.CredentialsFile != "" { clientOpts = append(clientOpts, option.WithCredentialsFile(opts.CredentialsFile)) } if opts.DisableAuth { clientOpts = append(clientOpts, option.WithoutAuthentication(), option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), ) } transport, err := apihttp.NewTransport(ctx, http.DefaultTransport, clientOpts...) if err != nil { return nil, err } roundTripper := promhttp.InstrumentRoundTripperCounter(queryCounter, promhttp.InstrumentRoundTripperDuration(queryHistogram, transport)) client, err := api.NewClient(api.Config{ Address: strings.ReplaceAll(opts.TargetURL.String(), projectIDVar, opts.ProjectID), RoundTripper: roundTripper, }) if err != nil { return nil, err } return v1.NewAPI(client), nil } // response wraps all Prometheus API responses. type response struct { Status string `json:"status"` Data any `json:"data,omitempty"` } // QueryFunc queries a Prometheus instance and returns a promql.Vector. func QueryFunc(ctx context.Context, q string, t time.Time, v1api v1.API) (parser.Value, v1.Warnings, error) { results, warnings, err := v1api.Query(ctx, q, t) if err != nil { return nil, warnings, fmt.Errorf("error querying Prometheus: %w", err) } v, err := convertModelToPromQLValue(results) return v, warnings, err } // sendAlerts returns the rules.NotifyFunc for a Notifier. func sendAlerts(s *notifier.Manager, projectID string, generatorURL *url.URL) rules.NotifyFunc { return func(_ context.Context, expr string, alerts ...*rules.Alert) { var res []*notifier.Alert for _, alert := range alerts { a := &notifier.Alert{ StartsAt: alert.FiredAt, Labels: alert.Labels, Annotations: alert.Annotations, } if !alert.ResolvedAt.IsZero() { a.EndsAt = alert.ResolvedAt } else { a.EndsAt = alert.ValidUntil } if generatorURL != nil { if generatorURL.String() == googleCloudBaseURL.String() { // Project ID is empty when the rule-evaluator is instantiated, before config-reloader runs. if projectID != "" { // If it's a GCM link (default), create the full URL for the alert. a.GeneratorURL = googleCloudLink(projectID, expr, alert.FiredAt, alert.FiredAt.Add(-time.Hour)).String() } } else { // Otherwise, if it was specified we assume it points to a Prometheus frontend. a.GeneratorURL = generatorURL.String() + strutil.TableLinkForExpression(expr) } } res = append(res, a) } if len(alerts) > 0 { s.Send(res...) } } } // googleCloudLink returns the link to the Google Cloud project, optionally with a pre-populated // query. The passed project must be a valid project. func googleCloudLink(projectID, expr string, endTime, startTime time.Time) *url.URL { // Clone URL to avoid mutating the original. url := googleCloudBaseURL // Note: The URL API was reverse-engineered. if !endTime.IsZero() { url.Path += ";endTime=" + endTime.Format(time.RFC3339) } if !startTime.IsZero() { url.Path += ";startTime=" + startTime.Format(time.RFC3339) } values := url.Query() values.Set("project", projectID) if expr != "" { // These settings reflect the default on metrics explorer for majority of use-cases. pageState := map[string]any{ "xyChart": map[string]any{ "dataSets": []map[string]any{ { "prometheusQuery": expr, }, }, }, } // Note, this also escapes the JSON. pageStateValue, err := json.Marshal(pageState) if err != nil { panic(err) } values.Set("pageState", string(pageStateValue)) } // Escapes the query (which may have escaped JSON). url.RawQuery = values.Encode() return &url } type reloader struct { name string reloader func(*operator.RuleEvaluatorConfig) error } // configMetrics establishes reloading metrics similar to Prometheus' built-in ones. type configMetrics struct { lastReloadSuccess bool lastReloadSuccessTime time.Time reloadSuccessMetric prometheus.Gauge reloadSuccessTimeMetric prometheus.Gauge } func newConfigMetrics(reg prometheus.Registerer) *configMetrics { m := &configMetrics{ reloadSuccessMetric: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "rule_evaluator_config_last_reload_successful", Help: "Whether the last configuration reload attempt was successful.", }), reloadSuccessTimeMetric: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "rule_evaluator_config_last_reload_success_timestamp_seconds", Help: "Timestamp of the last successful configuration reload.", }), } if reg != nil { reg.MustRegister(m.reloadSuccessMetric, m.reloadSuccessTimeMetric) } return m } func (m *configMetrics) setSuccess() { m.lastReloadSuccess = true m.lastReloadSuccessTime = time.Now() m.reloadSuccessMetric.Set(1) m.reloadSuccessTimeMetric.SetToCurrentTime() } func (m *configMetrics) setFailure() { m.lastReloadSuccess = false m.reloadSuccessMetric.Set(0) } // reloadConfig applies the configuration files. func reloadConfig(filename string, logger log.Logger, metrics *configMetrics, rls ...reloader) (err error) { start := time.Now() timings := []interface{}{} _ = level.Info(logger).Log("msg", "Loading configuration file", "filename", filename) content, err := os.ReadFile(filename) if err != nil { return fmt.Errorf("read configuration (--config.file=%q): %w", filename, err) } conf, err := loadConfig(content) if err != nil { metrics.setFailure() return fmt.Errorf("load configuration (--config.file=%q): %w", filename, err) } failed := false for _, rl := range rls { rstart := time.Now() if err := rl.reloader(conf); err != nil { _ = level.Error(logger).Log("msg", "Failed to apply configuration", "err", err) failed = true } timings = append(timings, rl.name, time.Since(rstart)) } if failed { metrics.setFailure() return fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename) } metrics.setSuccess() l := []interface{}{"msg", "Completed loading of configuration file", "filename", filename, "totalDuration", time.Since(start)} _ = level.Info(logger).Log(append(l, timings...)...) return nil } func loadConfig(content []byte) (*operator.RuleEvaluatorConfig, error) { conf := &operator.RuleEvaluatorConfig{ Config: config.DefaultConfig, } // Don't expand external labels on config file loading. It's a feature we like but we // want to remain compatible with Prometheus and this is still an experimental feature, // which we don't support. See the Prometheus' config.LoadFile method. if err := yaml.Unmarshal(content, conf); err != nil { return nil, fmt.Errorf("unmarshal: %w", err) } return conf, nil } // convertMetricToLabel converts model.Metric to labels.label. func convertMetricToLabel(metric model.Metric) labels.Labels { ls := make(labels.Labels, 0, len(metric)) for name, value := range metric { l := labels.Label{ Name: string(name), Value: string(value), } ls = append(ls, l) } return ls } // convertModelToPromQLValue converts model.Value type to promql type. func convertModelToPromQLValue(val model.Value) (parser.Value, error) { switch results := val.(type) { case model.Matrix: m := make(promql.Matrix, len(results)) for i, result := range results { pts := make([]promql.FPoint, len(result.Values)) for j, samplePair := range result.Values { pts[j] = promql.FPoint{ T: int64(samplePair.Timestamp), F: float64(samplePair.Value), } } m[i] = promql.Series{ Metric: convertMetricToLabel(result.Metric), Floats: pts, } } return m, nil case model.Vector: v := make(promql.Vector, len(results)) for i, result := range results { v[i] = promql.Sample{ T: int64(result.Timestamp), F: float64(result.Value), Metric: convertMetricToLabel(result.Metric), } } return v, nil default: return nil, fmt.Errorf("expected Prometheus results of type matrix or vector. actual results type: %v", val.Type()) } } // Converting v1.Warnings to storage.Warnings. func convertV1WarningsToStorageWarnings(w v1.Warnings) storage.Warnings { warnings := make(storage.Warnings, len(w)) for i, warning := range w { warnings[i] = errors.New(warning) } return warnings } // listSeriesSet implements the storage.SeriesSet interface on top a list of listSeries. type listSeriesSet struct { m promql.Matrix idx int err error warnings storage.Warnings } // Next advances the iterator and returns true if there's data to consume. func (ss *listSeriesSet) Next() bool { ss.idx++ return ss.idx < len(ss.m) } // At returns the current series. func (ss *listSeriesSet) At() storage.Series { return promql.NewStorageSeries(ss.m[ss.idx]) } // Err returns an error encountered while iterating. func (ss *listSeriesSet) Err() error { return ss.err } // Warnings returns warnings encountered while iterating. func (ss *listSeriesSet) Warnings() storage.Warnings { return ss.warnings } func newListSeriesSet(v promql.Matrix, err error, w v1.Warnings) *listSeriesSet { return &listSeriesSet{m: v, idx: -1, err: err, warnings: convertV1WarningsToStorageWarnings(w)} } // convertMatchersToPromQL converts []*labels.Matcher to a PromQL query. func convertMatchersToPromQL(matchers []*labels.Matcher, d int64) (string, []string) { metricLabels := make([]string, 0, len(matchers)) filteredMatchers := make([]string, 0, len(matchers)) for _, m := range matchers { metricLabels = append(metricLabels, m.String()) filteredMatchers = append(filteredMatchers, m.Name) } queryExpression := fmt.Sprintf("{%s}[%ds]", strings.Join(metricLabels, ", "), d) return queryExpression, filteredMatchers } // queryStorage implements storage.Queryable. type queryStorage struct { api v1.API } // Querier provides querying access over time series data of a fixed time range. func (s *queryStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { db := &queryAccess{ api: s.api, ctx: ctx, mint: mint / 1000, // divide by 1000 to convert milliseconds to seconds. maxt: maxt / 1000, query: QueryFunc, } return db, nil } // queryAccess implements storage.Querier. type queryAccess struct { // storage.LabelQuerier satisfies the interface. Calling related methods will result in panic. storage.LabelQuerier api v1.API mint int64 maxt int64 ctx context.Context query func(context.Context, string, time.Time, v1.API) (parser.Value, v1.Warnings, error) } // Select returns a set of series that matches the given label matchers and time range. func (db *queryAccess) Select(sort bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { if sort || hints != nil { return newListSeriesSet(nil, errors.New("sorting series and select hints are not supported"), nil) } duration := db.maxt - db.mint if duration <= 0 { // not a valid time duration. return newListSeriesSet(nil, nil, nil) } queryExpression, filteredMatchers := convertMatchersToPromQL(matchers, duration) maxt := time.Unix(db.maxt, 0) v, warnings, err := db.query(db.ctx, queryExpression, maxt, db.api) if err != nil { return newListSeriesSet(nil, err, warnings) } m, ok := v.(promql.Matrix) if !ok { return newListSeriesSet(nil, fmt.Errorf("error querying Prometheus, expected type matrix response. actual type %v", v.Type()), nil) } // TODO(maxamin) GCM returns label names and values that are not in matchers. // Ensure results from query are equivalent to the requested matchers because // manager.go checks if returned labels have the same length as matchers. // Upstream change to prometheus code may be necessary. for i, sample := range m { m[i].Metric = sample.Metric.MatchLabels(true, filteredMatchers...) } return newListSeriesSet(m, err, warnings) } func (db *queryAccess) Close() error { return nil } type ruleEvaluator struct { ctx context.Context logger log.Logger version string appendable storage.Appendable notifierManager *notifier.Manager rulesMetrics *rules.Metrics queryFunc rules.QueryFunc rulesManager *rules.Manager lastEvaluatorOpts *evaluatorOptions mtx sync.Mutex } // Returns the URL that points to the rule-evaluator instance (set by the user). By default, or if // using a Google Cloud base URL, this returns a link to the Google Cloud project page. func getExternalURL(generatorURL *url.URL, projectID string) *url.URL { // Project ID is empty when the rule-evaluator is instantiated, before config-reloader runs. if generatorURL == nil || projectID == "" { return nil } if generatorURL.String() == googleCloudBaseURL.String() { // If it's a GCM link (default), create the full URL for the alert. return googleCloudLink(projectID, "", time.Time{}, time.Time{}) } return generatorURL } func newRuleEvaluator( ctx context.Context, logger log.Logger, evaluatorOpts *evaluatorOptions, version string, appendable storage.Appendable, notifierManager *notifier.Manager, rulesMetrics *rules.Metrics, ) (*ruleEvaluator, error) { v1api, err := newAPI(ctx, evaluatorOpts, version) if err != nil { return nil, fmt.Errorf("query client: %w", err) } queryFunc := newQueryFunc(logger, v1api) rulesManager := rules.NewManager(&rules.ManagerOptions{ ExternalURL: getExternalURL(evaluatorOpts.GeneratorURL, evaluatorOpts.ProjectID), QueryFunc: queryFunc, Context: ctx, Appendable: appendable, Queryable: &queryStorage{ api: v1api, }, Logger: logger, NotifyFunc: sendAlerts(notifierManager, evaluatorOpts.ProjectID, evaluatorOpts.GeneratorURL), Metrics: rulesMetrics, }) evaluator := ruleEvaluator{ ctx: ctx, logger: logger, version: version, appendable: appendable, notifierManager: notifierManager, rulesMetrics: rulesMetrics, rulesManager: rulesManager, queryFunc: queryFunc, lastEvaluatorOpts: evaluatorOpts, } return &evaluator, nil } func (e *ruleEvaluator) ApplyConfig(cfg *config.Config, evaluatorOpts *evaluatorOptions) error { if evaluatorOpts != nil && !reflect.DeepEqual(evaluatorOpts, e.lastEvaluatorOpts) { e.lastEvaluatorOpts = evaluatorOpts v1api, err := newAPI(e.ctx, evaluatorOpts, e.version) if err != nil { return fmt.Errorf("query client: %w", err) } queryFunc := newQueryFunc(e.logger, v1api) rulesManager := rules.NewManager(&rules.ManagerOptions{ ExternalURL: getExternalURL(evaluatorOpts.GeneratorURL, evaluatorOpts.ProjectID), QueryFunc: queryFunc, Context: e.ctx, Appendable: e.appendable, Queryable: &queryStorage{ api: v1api, }, Logger: e.logger, NotifyFunc: sendAlerts(e.notifierManager, evaluatorOpts.ProjectID, evaluatorOpts.GeneratorURL), Metrics: e.rulesMetrics, }) // Set new rule-manager and flag before stopping, so we can rerun with the new one. e.mtx.Lock() oldRuleManager := e.rulesManager e.rulesManager = rulesManager oldRuleManager.Stop() e.queryFunc = queryFunc e.mtx.Unlock() _, err = queryFunc(e.ctx, "vector(1)", time.Now()) if err != nil { _ = level.Error(e.logger).Log("msg", "Error querying Prometheus instance", "err", err) } } // Get all rule files matching the configuration paths. var files []string for _, pat := range cfg.RuleFiles { fs, err := filepath.Glob(pat) if fs == nil || err != nil { return fmt.Errorf("retrieving rule file: %s", pat) } files = append(files, fs...) } return e.rulesManager.Update( time.Duration(cfg.GlobalConfig.EvaluationInterval), files, cfg.GlobalConfig.ExternalLabels, "", nil, ) } func (e *ruleEvaluator) Query(ctx context.Context, q string, t time.Time) (promql.Vector, error) { // Copy the function in case it changes, but don't block until it completes. e.mtx.Lock() queryFunc := e.queryFunc e.mtx.Unlock() return queryFunc(ctx, q, t) } func (e *ruleEvaluator) Run() { for { // Copy the rule-manager before running, so we don't hold the lock. e.mtx.Lock() curr := e.rulesManager e.mtx.Unlock() // A nil indicates shutdown, otherwise it's a config update requiring restart. if curr == nil { break } curr.Run() } } func (e *ruleEvaluator) Stop() { e.mtx.Lock() defer e.mtx.Unlock() e.rulesManager.Stop() } func newQueryFunc(logger log.Logger, v1api v1.API) rules.QueryFunc { return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { v, warnings, err := QueryFunc(ctx, q, t, v1api) if len(warnings) > 0 { _ = level.Warn(logger).Log("msg", "Querying Prometheus instance returned warnings", "warn", warnings) } if err != nil { return nil, fmt.Errorf("execute query: %w", err) } vec, ok := v.(promql.Vector) if !ok { return nil, fmt.Errorf("query Prometheus, Expected type vector response. Actual type %v", v.Type()) } return vec, nil } }