alerter/service.go (183 lines of code) (raw):

package alerter import ( "context" "fmt" "net/http" "os" "sync" "time" "github.com/Azure/adx-mon/alerter/alert" "github.com/Azure/adx-mon/alerter/engine" "github.com/Azure/adx-mon/alerter/multikustoclient" "github.com/Azure/adx-mon/metrics" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/azure-kusto-go/kusto" "github.com/prometheus/client_golang/prometheus/promhttp" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/Azure/adx-mon/alerter/rules" ) type AlerterOpts struct { Dev bool KustoEndpoints map[string]string Region string AlertAddr string Cloud string Port int Concurrency int Tags map[string]string // MaxNotifications is the maximum number of notifications to send per rule. MaxNotifications int // Managed Identity options MSIID string MSIResource string // Application Token options KustoToken string CtrlCli client.Client } // share with executor or fine for both to define privately? type ruleStore interface { Rules() []*rules.Rule Open(context.Context) error Close() error } type Alerter struct { clients map[string]KustoClient queue chan struct{} alertCli *alert.Client opts *AlerterOpts wg sync.WaitGroup executor *engine.Executor ctx context.Context closeFn context.CancelFunc CtrlCli client.Client ruleStore ruleStore } type KustoClient interface { Mgmt(ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) Query(ctx context.Context, db string, query kusto.Stmt, options ...kusto.QueryOption) (*kusto.RowIterator, error) Endpoint() string } func NewService(opts *AlerterOpts) (*Alerter, error) { ruleStore := rules.NewStore(rules.StoreOpts{ Region: opts.Region, CtrlCli: opts.CtrlCli, }) l2m := &Alerter{ opts: opts, queue: make(chan struct{}, opts.Concurrency), CtrlCli: opts.CtrlCli, ruleStore: ruleStore, clients: make(map[string]KustoClient), } if opts.MSIID != "" { logger.Infof("Using MSI ID=%s", opts.MSIID) } authConfigure, err := multikustoclient.GetAuth(multikustoclient.MsiAuth(opts.MSIID), multikustoclient.TokenAuth("https://kusto.kusto.windows.net", opts.KustoToken), multikustoclient.DefaultAuth()) if err != nil { return nil, fmt.Errorf("failed to get auth: %w", err) } kclient, err := multikustoclient.New(opts.KustoEndpoints, authConfigure, opts.MaxNotifications) if err != nil { return nil, err } if opts.CtrlCli == nil { logger.Warnf("No kusto endpoints provided, using fake kusto clients") fakeRule := &rules.Rule{ Namespace: "fake-namespace", Name: "FakeRule", Database: "FakeDB", Interval: time.Minute, Query: `UnderlayNodeInfo | where Region == ParamRegion | limit 1 | project Title="test"`, } l2m.clients[fakeRule.Database] = fakeKustoClient{endpoint: "http://fake.endpoint"} ruleStore.Register(fakeRule) } if opts.AlertAddr == "" { logger.Warnf("No alert address provided, using fake alert handler") http.Handle("/alerts", fakeAlertHandler()) opts.AlertAddr = fmt.Sprintf("http://localhost:%d", opts.Port) } alertCli, err := alert.NewClient(time.Minute) if err != nil { return nil, fmt.Errorf("failed to create alert client: %w", err) } l2m.alertCli = alertCli executor := engine.NewExecutor( engine.ExecutorOpts{ AlertCli: alertCli, AlertAddr: opts.AlertAddr, Region: opts.Region, Tags: opts.Tags, KustoClient: kclient, RuleStore: ruleStore, }) l2m.executor = executor return l2m, nil } func Lint(ctx context.Context, opts *AlerterOpts, path string) error { ruleStore, err := rules.FromPath(path, opts.Region) if err != nil { return err } logger.Infof("Linting rules from path=%s", path) lint := NewLinter() authConfigure, err := multikustoclient.GetAuth(multikustoclient.MsiAuth(opts.MSIID), multikustoclient.TokenAuth("https://kusto.kusto.windows.net", opts.KustoToken), multikustoclient.DefaultAuth()) if err != nil { return fmt.Errorf("failed to get auth: %w", err) } kclient, err := multikustoclient.New(opts.KustoEndpoints, authConfigure, opts.MaxNotifications) if err != nil { return err } executor := engine.NewExecutor(engine.ExecutorOpts{ AlertCli: lint, AlertAddr: "http://fake.microsoft.com", KustoClient: kclient, RuleStore: ruleStore, Region: opts.Region, Tags: opts.Tags, }) executor.RunOnce(ctx) if lint.HasFailedQueries() { return fmt.Errorf("failed to lint rules") } lint.Log() return nil } func (l *Alerter) Open(ctx context.Context) error { l.ctx, l.closeFn = context.WithCancel(ctx) logger.Infof("Starting adx-mon alerter") if err := l.ruleStore.Open(ctx); err != nil { return fmt.Errorf("failed to open rule store: %w", err) } if err := l.executor.Open(ctx); err != nil { return fmt.Errorf("failed to open executor: %w", err) } go func() { logger.Infof("Listening at :%d", l.opts.Port) http.Handle("/metrics", promhttp.Handler()) if err := http.ListenAndServe(fmt.Sprintf(":%d", l.opts.Port), nil); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } }() go func() { ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { select { case <-l.ctx.Done(): return case <-ticker.C: metrics.AlerterHealthCheck.WithLabelValues(l.opts.Region).Set(1) } } }() return nil } func (l *Alerter) Close() error { l.closeFn() if err := l.executor.Close(); err != nil { return fmt.Errorf("failed to close executor: %w", err) } if err := l.ruleStore.Close(); err != nil { return fmt.Errorf("failed to close rule store: %w", err) } return nil }