func()

in alerter/engine/executor.go [105:184]


func (e *Executor) HandlerFn(ctx context.Context, endpoint string, qc *QueryContext, row *table.Row) error {
	res := Notification{
		Severity:     math.MinInt64,
		CustomFields: map[string]string{},
	}

	columns := row.ColumnNames()
	for i, value := range row.Values {
		switch strings.ToLower(columns[i]) {
		case "title":
			res.Title = value.String()
		case "description":
			res.Description = value.String()
		case "severity":
			v, err := e.asInt64(value)
			if err != nil {
				return &NotificationValidationError{err.Error()}
			}
			res.Severity = v
		case "recipient":
			res.Recipient = value.String()
		case "summary":
			res.Summary = value.String()
		case "correlationid":
			res.CorrelationID = value.String()
		default:
			res.CustomFields[columns[i]] = value.String()
		}
	}

	if err := res.Validate(); err != nil {
		return err
	}

	summary, err := KustoQueryLinks(res.Summary, qc.Query, endpoint, qc.Rule.Database)
	if err != nil {
		metrics.QueryHealth.WithLabelValues(qc.Rule.Namespace, qc.Rule.Name).Set(0)
		return fmt.Errorf("failed to create kusto deep link: %w", err)
	}

	if res.CorrelationID != "" && !strings.HasPrefix(res.CorrelationID, fmt.Sprintf("%s/%s://", qc.Rule.Namespace, qc.Rule.Name)) {
		res.CorrelationID = fmt.Sprintf("%s/%s://%s", qc.Rule.Namespace, qc.Rule.Name, res.CorrelationID)
	}

	destination := qc.Rule.Destination
	// The recipient query results field is deprecated.
	if destination == "" {
		logger.Warnf("Recipient query results field is deprecated. Please use the destination field in the rule instead for %s/%s.", qc.Rule.Namespace, qc.Rule.Name)
		destination = res.Recipient
	}

	a := alert.Alert{
		Destination:   destination,
		Title:         res.Title,
		Summary:       summary,
		Description:   res.Description,
		Severity:      int(res.Severity),
		Source:        fmt.Sprintf("%s/%s", qc.Rule.Namespace, qc.Rule.Name),
		CorrelationID: res.CorrelationID,
		CustomFields:  res.CustomFields,
	}

	addr := fmt.Sprintf("%s/alerts", e.alertAddr)
	logger.Debugf("Sending alert %s %v", addr, a)

	if err := e.alertCli.Create(context.Background(), addr, a); err != nil {
		if errors.Is(err, alert.ErrTooManyRequests) {
			logger.Errorf("Failed to create Notification due to throttling: %s/%s", qc.Rule.Namespace, qc.Rule.Name)
			// We are throttled. Bail out of this loop so we stop trying to send notifications that will just be throttled.
			return err
		}

		logger.Errorf("Failed to create Notification: %s\n", err)
		metrics.NotificationUnhealthy.WithLabelValues(qc.Rule.Namespace, qc.Rule.Name).Set(1)
		return nil
	}
	metrics.NotificationUnhealthy.WithLabelValues(qc.Rule.Namespace, qc.Rule.Name).Set(0)

	return nil
}