alerter/engine/worker.go (170 lines of code) (raw):

package engine import ( "context" "errors" "fmt" "strings" "sync" "time" "github.com/Azure/adx-mon/alerter/alert" "github.com/Azure/adx-mon/alerter/queue" "github.com/Azure/adx-mon/alerter/rules" "github.com/Azure/adx-mon/metrics" "github.com/Azure/adx-mon/pkg/logger" kerrors "github.com/Azure/azure-kusto-go/kusto/data/errors" "github.com/Azure/azure-kusto-go/kusto/data/table" ) const ( maxQueryTime = 5 * time.Minute ) type worker struct { mu sync.Mutex cancel context.CancelFunc wg sync.WaitGroup rule *rules.Rule Region string tags map[string]string kustoClient Client AlertAddr string AlertCli interface { Create(ctx context.Context, endpoint string, alert alert.Alert) error } HandlerFn func(ctx context.Context, endpoint string, qc *QueryContext, row *table.Row) error } func (e *worker) Run(ctx context.Context) { e.wg.Add(1) e.mu.Lock() ctx, e.cancel = context.WithCancel(ctx) e.mu.Unlock() go func() { defer e.wg.Done() logger.Infof("Creating query executor for %s/%s in %s executing every %s", e.rule.Namespace, e.rule.Name, e.rule.Database, e.rule.Interval.String()) // do-while e.ExecuteQuery(ctx) ticker := time.NewTicker(e.rule.Interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: e.ExecuteQuery(ctx) } } }() } func (e *worker) ExecuteQuery(ctx context.Context) { // Check if the rule is enabled for this instance by matching any of the alert criteria tags. var matched bool for k, v := range e.rule.Criteria { lowerKey := strings.ToLower(k) if vv, ok := e.tags[lowerKey]; ok { for _, value := range v { if strings.ToLower(vv) == strings.ToLower(value) { matched = true break } } } if matched { break } } // If tags are specified, but none of them matched, skip the query if len(e.rule.Criteria) > 0 && !matched { logger.Infof("Skipping %s/%s on %s/%s because none of the tags matched: %v", e.rule.Namespace, e.rule.Name, e.kustoClient.Endpoint(e.rule.Database), e.rule.Database, e.tags) return } // Try to acquire a worker slot queue.Workers <- struct{}{} // Release the worker slot defer func() { <-queue.Workers }() ctx, cancel := context.WithTimeout(ctx, maxQueryTime) defer cancel() start := time.Now().UTC() queryContext, err := NewQueryContext(e.rule, start, e.Region) if err != nil { logger.Errorf("Failed to wrap query=%s/%s on %s/%s: %s", e.rule.Namespace, e.rule.Name, e.kustoClient.Endpoint(e.rule.Database), e.rule.Database, err) return } logger.Infof("Executing %s/%s on %s/%s", e.rule.Namespace, e.rule.Name, e.kustoClient.Endpoint(e.rule.Database), e.rule.Database) err, rows := e.kustoClient.Query(ctx, queryContext, e.HandlerFn) if err != nil { // This failed because we sent too many notifications. if errors.Is(err, alert.ErrTooManyRequests) { err := e.AlertCli.Create(ctx, e.AlertAddr, alert.Alert{ Destination: e.rule.Destination, Title: fmt.Sprintf("Alert %s/%s has too many notifications in %s", e.rule.Namespace, e.rule.Name, e.Region), Summary: "This alert has been throttled by ICM due to too many notifications. Please reduce the number of notifications for this alert.", Severity: 3, Source: fmt.Sprintf("notification-failure/%s/%s", e.rule.Namespace, e.rule.Name), CorrelationID: fmt.Sprintf("notification-failure/%s/%s", e.rule.Namespace, e.rule.Name), }) if err != nil { logger.Errorf("Failed to send alert for throttled notification for %s/%s: %s", e.rule.Namespace, e.rule.Name, err) } return } // This failed because the query failed. logger.Errorf("Failed to execute query=%s/%s on %s/%s: %s", e.rule.Namespace, e.rule.Name, e.kustoClient.Endpoint(e.rule.Database), e.rule.Database, err) if !isUserError(err) { metrics.QueryHealth.WithLabelValues(e.rule.Namespace, e.rule.Name).Set(0) return } summary, err := KustoQueryLinks(fmt.Sprintf("This query is failing to execute:<br/><br/><pre>%s</pre><br/><br/>", err.Error()), queryContext.Query, e.kustoClient.Endpoint(e.rule.Database), e.rule.Database) if err != nil { logger.Errorf("Failed to send failure alert for %s/%s: %s", e.rule.Namespace, e.rule.Name, err) metrics.NotificationUnhealthy.WithLabelValues(e.rule.Namespace, e.rule.Name).Set(1) return } endpointBaseName, _ := strings.CutPrefix(e.kustoClient.Endpoint(e.rule.Database), "https://") err = e.AlertCli.Create(ctx, e.AlertAddr, alert.Alert{ Destination: e.rule.Destination, Title: fmt.Sprintf("Alert %s/%s has query errors on %s", e.rule.Namespace, e.rule.Name, e.kustoClient.Endpoint(e.rule.Database)), Summary: summary, Severity: 3, Source: fmt.Sprintf("%s/%s", e.rule.Namespace, e.rule.Name), CorrelationID: fmt.Sprintf("alert-failure/%s/%s/%s", endpointBaseName, e.rule.Namespace, e.rule.Name), }) if err != nil { logger.Errorf("Failed to send failure alert for %s/%s/%s: %s", endpointBaseName, e.rule.Namespace, e.rule.Name, err) // Only set the notification as failed if we are not able to send a failure alert directly. metrics.NotificationUnhealthy.WithLabelValues(e.rule.Namespace, e.rule.Name).Set(1) return } else { metrics.NotificationUnhealthy.WithLabelValues(e.rule.Namespace, e.rule.Name).Set(0) } // Query failed due to user error, so return the query to healthy. metrics.QueryHealth.WithLabelValues(e.rule.Namespace, e.rule.Name).Set(1) return } metrics.QueryHealth.WithLabelValues(e.rule.Namespace, e.rule.Name).Set(1) metrics.QueriesRunTotal.WithLabelValues().Inc() logger.Infof("Completed %s/%s in %s", e.rule.Namespace, e.rule.Name, time.Since(start)) logger.Infof("Query for %s/%s completed with %d entries found", e.rule.Namespace, e.rule.Name, rows) } func (e *worker) Close() { e.mu.Lock() cancelFn := e.cancel e.mu.Unlock() if cancelFn != nil { cancelFn() } e.wg.Wait() } func isUserError(err error) bool { if err == nil { return false } // User specified a database in their CRD that adx-mon does not have configured. var unknownDB *UnknownDBError if errors.As(err, &unknownDB) { return true } // User's query results are missing a required column, or they are the wrong type. var validationErr *NotificationValidationError if errors.As(err, &validationErr) { return true } // Look to see if a kusto query error is specific to how the query was defined and not due to problems with adx-mon itself. var kerr *kerrors.HttpError if errors.As(err, &kerr) { if kerr.Kind == kerrors.KClientArgs { return true } lowerErr := strings.ToLower(kerr.Error()) if strings.Contains(lowerErr, "sem0001") || strings.Contains(lowerErr, "semantic error") || strings.Contains(lowerErr, "request is invalid") { return true } } return false }