alerter/engine/executor.go (225 lines of code) (raw):
package engine
import (
"context"
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
"github.com/Azure/adx-mon/alerter/alert"
"github.com/Azure/adx-mon/alerter/rules"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/azure-kusto-go/kusto/data/table"
kustovalues "github.com/Azure/azure-kusto-go/kusto/data/value"
)
type ruleStore interface {
Rules() []*rules.Rule
}
type AlertCli interface {
Create(ctx context.Context, endpoint string, alert alert.Alert) error
}
type Executor struct {
alertCli AlertCli
alertAddr string
kustoClient Client
ruleStore ruleStore
region string
// tags are access by the worker concurrently outside a mutex. This is safe because
// the map is never modified after creation.
tags map[string]string
wg sync.WaitGroup
closeFn context.CancelFunc
mu sync.RWMutex
workers map[string]*worker
}
type ExecutorOpts struct {
AlertCli AlertCli
AlertAddr string
KustoClient Client
RuleStore ruleStore
Region string
Tags map[string]string
}
// TODO make AlertAddr string part of alertcli
func NewExecutor(opts ExecutorOpts) *Executor {
return &Executor{
alertCli: opts.AlertCli,
alertAddr: opts.AlertAddr,
kustoClient: opts.KustoClient,
ruleStore: opts.RuleStore,
region: opts.Region,
tags: opts.Tags,
workers: make(map[string]*worker),
}
}
func (e *Executor) Open(ctx context.Context) error {
ctx, e.closeFn = context.WithCancel(ctx)
logger.Infof("Begin executing %d queries", len(e.ruleStore.Rules()))
e.syncWorkers(ctx)
go e.periodicSync(ctx)
return nil
}
func (e *Executor) workerKey(rule *rules.Rule) string {
return fmt.Sprintf("%s/%s", rule.Namespace, rule.Name)
}
func (e *Executor) newWorker(rule *rules.Rule) *worker {
lowerTags := make(map[string]string, len(e.tags))
for k, v := range e.tags {
lowerTags[strings.ToLower(k)] = strings.ToLower(v)
}
return &worker{
rule: rule,
tags: lowerTags,
kustoClient: e.kustoClient,
Region: e.region,
HandlerFn: e.HandlerFn,
AlertCli: e.alertCli,
AlertAddr: fmt.Sprintf("%s/alerts", e.alertAddr),
}
}
func (e *Executor) Close() error {
e.closeFn()
e.wg.Wait()
return nil
}
// HandlerFn converts rows of a query to Alerts.
func (e *Executor) HandlerFn(ctx context.Context, endpoint string, qc *QueryContext, row *table.Row) error {
res := Notification{
Severity: math.MinInt64,
CustomFields: map[string]string{},
}
columns := row.ColumnNames()
for i, value := range row.Values {
switch strings.ToLower(columns[i]) {
case "title":
res.Title = value.String()
case "description":
res.Description = value.String()
case "severity":
v, err := e.asInt64(value)
if err != nil {
return &NotificationValidationError{err.Error()}
}
res.Severity = v
case "recipient":
res.Recipient = value.String()
case "summary":
res.Summary = value.String()
case "correlationid":
res.CorrelationID = value.String()
default:
res.CustomFields[columns[i]] = value.String()
}
}
if err := res.Validate(); err != nil {
return err
}
summary, err := KustoQueryLinks(res.Summary, qc.Query, endpoint, qc.Rule.Database)
if err != nil {
metrics.QueryHealth.WithLabelValues(qc.Rule.Namespace, qc.Rule.Name).Set(0)
return fmt.Errorf("failed to create kusto deep link: %w", err)
}
if res.CorrelationID != "" && !strings.HasPrefix(res.CorrelationID, fmt.Sprintf("%s/%s://", qc.Rule.Namespace, qc.Rule.Name)) {
res.CorrelationID = fmt.Sprintf("%s/%s://%s", qc.Rule.Namespace, qc.Rule.Name, res.CorrelationID)
}
destination := qc.Rule.Destination
// The recipient query results field is deprecated.
if destination == "" {
logger.Warnf("Recipient query results field is deprecated. Please use the destination field in the rule instead for %s/%s.", qc.Rule.Namespace, qc.Rule.Name)
destination = res.Recipient
}
a := alert.Alert{
Destination: destination,
Title: res.Title,
Summary: summary,
Description: res.Description,
Severity: int(res.Severity),
Source: fmt.Sprintf("%s/%s", qc.Rule.Namespace, qc.Rule.Name),
CorrelationID: res.CorrelationID,
CustomFields: res.CustomFields,
}
addr := fmt.Sprintf("%s/alerts", e.alertAddr)
logger.Debugf("Sending alert %s %v", addr, a)
if err := e.alertCli.Create(context.Background(), addr, a); err != nil {
if errors.Is(err, alert.ErrTooManyRequests) {
logger.Errorf("Failed to create Notification due to throttling: %s/%s", qc.Rule.Namespace, qc.Rule.Name)
// We are throttled. Bail out of this loop so we stop trying to send notifications that will just be throttled.
return err
}
logger.Errorf("Failed to create Notification: %s\n", err)
metrics.NotificationUnhealthy.WithLabelValues(qc.Rule.Namespace, qc.Rule.Name).Set(1)
return nil
}
metrics.NotificationUnhealthy.WithLabelValues(qc.Rule.Namespace, qc.Rule.Name).Set(0)
return nil
}
func (e *Executor) asInt64(value kustovalues.Kusto) (int64, error) {
switch t := value.(type) {
case kustovalues.Long:
return t.Value, nil
case kustovalues.Real:
return int64(t.Value), nil
case kustovalues.String:
v, err := strconv.ParseInt(t.Value, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to convert severity to int: %w", err)
}
return v, nil
case kustovalues.Int:
return int64(t.Value), nil
case kustovalues.Decimal:
v, err := strconv.ParseFloat(t.Value, 64)
if err != nil {
return 0, fmt.Errorf("failed to convert severity to int: %w", err)
}
return int64(v), nil
default:
return 0, fmt.Errorf("failed to convert severity to int: %s", value.String())
}
}
func (e *Executor) RunOnce(ctx context.Context) {
ctx, e.closeFn = context.WithCancel(ctx)
for _, r := range e.ruleStore.Rules() {
worker := e.newWorker(r)
worker.ExecuteQuery(ctx)
}
}
// syncWorkers ensures that the workers are running for the current set of rules. If any new rules
// are added, or existing rules are updated, a new worker will be started. If any rules are deleted,
// the worker will be stopped. This function is called periodically by the executor.
func (e *Executor) syncWorkers(ctx context.Context) {
// Track the query Ids that are still definied as CRs, so we can determine which ones were deleted.
liveQueries := make(map[string]struct{})
for _, r := range e.ruleStore.Rules() {
id := e.workerKey(r)
liveQueries[id] = struct{}{}
w, ok := e.workers[id]
if !ok {
logger.Infof("Starting new worker for %s", id)
worker := e.newWorker(r)
worker.Run(ctx)
e.workers[id] = worker
continue
}
// Rule has not changed, leave the existing working running
if w.rule.Version == r.Version {
continue
}
logger.Infof("Rule %s has changed, restarting worker", id)
w.Close()
delete(e.workers, id)
w = e.newWorker(r)
e.workers[id] = w
w.Run(ctx)
}
// Shutdown any workers that no longer exist
for id := range e.workers {
if _, ok := liveQueries[id]; !ok {
logger.Infof("Shutting down worker for %s", id)
e.workers[id].Close()
delete(e.workers, id)
}
}
}
// periodicSync will periodically sync the workers with the current set of rules.
func (e *Executor) periodicSync(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-ticker.C:
e.syncWorkers(ctx)
case <-ctx.Done():
return
}
}
}