alerter/rules/store.go (128 lines of code) (raw):
package rules
import (
"context"
"fmt"
"strings"
"sync"
"time"
alertrulev1 "github.com/Azure/adx-mon/api/v1"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/unsafe"
// //nolint:godot // comment does not end with a sentence // temporarily disabling code
"sigs.k8s.io/controller-runtime/pkg/client"
)
type StoreOpts struct {
Region string
CtrlCli client.Client
}
type Store struct {
ctx context.Context
cancel context.CancelFunc
ctrlCli client.Client
opts StoreOpts
wg sync.WaitGroup
mu sync.RWMutex
rules []*Rule
}
func NewStore(opts StoreOpts) *Store {
return &Store{
ctrlCli: opts.CtrlCli,
opts: opts,
}
}
func (s *Store) Open(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
s.ctx, s.cancel = context.WithCancel(ctx)
// If we have no kube client, don't try to reload rules
if s.ctrlCli == nil {
return nil
}
rules, err := s.reloadRules()
if err != nil {
return err
}
s.rules = rules
go s.reloadPeriodically()
return nil
}
func (s *Store) Close() error {
s.cancel()
s.wg.Wait()
return nil
}
func (s *Store) Rules() []*Rule {
s.mu.RLock()
defer s.mu.RUnlock()
return s.rules
}
func toRule(r alertrulev1.AlertRule, region string) (*Rule, error) {
rule := &Rule{
Version: r.ResourceVersion,
Database: r.Spec.Database,
Namespace: r.Namespace,
Name: r.Name,
Interval: r.Spec.Interval.Duration,
Query: r.Spec.Query,
Destination: r.Spec.Destination,
AutoMitigateAfter: r.Spec.AutoMitigateAfter.Duration,
Criteria: r.Spec.Criteria,
IsMgmtQuery: false,
}
// If a query starts with a dot then it is acting against that Kusto cluster and not looking through
// rows in any particular table. So we don't want to wrap the query with the ParamRegion query_parameter()
// declaration because then Kusto will say it's an invalid query.
rule.IsMgmtQuery = strings.HasPrefix(r.Spec.Query, ".")
stmt := kusto.NewStmt(``, kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(r.Spec.Query)
rule.Stmt = stmt
return rule, nil
}
func (s *Store) reloadRules() ([]*Rule, error) {
ruleList := &alertrulev1.AlertRuleList{}
if err := s.ctrlCli.List(context.Background(), ruleList); err != nil {
return nil, fmt.Errorf("failed to list alert rules: %w", err)
}
var rules = make([]*Rule, 0, len(ruleList.Items))
for _, r := range ruleList.Items {
rule, err := toRule(r, s.opts.Region)
if err != nil {
return nil, err
}
rules = append(rules, rule)
}
return rules, nil
}
func (s *Store) reloadPeriodically() {
s.wg.Add(1)
defer s.wg.Done()
for {
select {
case <-s.ctx.Done():
return
case <-time.After(time.Minute):
logger.Infof("Reloading rules...")
rules, err := s.reloadRules()
if err != nil {
logger.Errorf("failed to reload rules: %s", err)
continue
}
s.mu.Lock()
s.rules = rules
s.mu.Unlock()
}
}
}
func (s *Store) Register(rule *Rule) {
s.mu.Lock()
defer s.mu.Unlock()
s.rules = append(s.rules, rule)
}
// Rule is analogous to a kusto-to-metric configuration, containing
// definitions and using the parlance found in the k2m UI.
type Rule struct {
Version string
Namespace string
Name string
Database string
Interval time.Duration
Query string
AutoMitigateAfter time.Duration
Destination string
// Criteria is a map of key-value pairs that are used to determine where an alert can execute.
Criteria map[string][]string
// Management queries (starts with a dot) have to call a different
// query API in the Kusto Go SDK.
IsMgmtQuery bool
// Stmt specifies the underlayEtcdPeersQuery to execute.
Stmt kusto.Stmt
}