alerter/multikustoclient/client.go (75 lines of code) (raw):
package multikustoclient
import (
"context"
"fmt"
"strings"
"github.com/Azure/adx-mon/alerter/alert"
"github.com/Azure/adx-mon/alerter/engine"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/data/table"
)
type multiKustoClient struct {
clients map[string]QueryClient
maxNotifications int
}
func New(endpoints map[string]string, configureAuth authConfiguror, max int) (multiKustoClient, error) {
clients := make(map[string]QueryClient)
for name, endpoint := range endpoints {
kcsb := kusto.NewConnectionStringBuilder(endpoint)
if strings.HasPrefix(endpoint, "https://") {
kcsb = configureAuth(kcsb.WithAzCli())
}
client, err := kusto.New(kcsb)
if err != nil {
return multiKustoClient{}, fmt.Errorf("kusto client=%s: %w", endpoint, err)
}
clients[name] = client
}
if len(clients) == 0 {
return multiKustoClient{}, fmt.Errorf("no kusto endpoints provided")
}
return multiKustoClient{clients: clients, maxNotifications: max}, nil
}
func (c multiKustoClient) Query(ctx context.Context, qc *engine.QueryContext, fn func(context.Context, string, *engine.QueryContext, *table.Row) error) (error, int) {
client := c.clients[qc.Rule.Database]
if client == nil {
return &engine.UnknownDBError{DB: qc.Rule.Database}, 0
}
var iter *kusto.RowIterator
var err error
if qc.Rule.IsMgmtQuery {
iter, err = client.Mgmt(ctx, qc.Rule.Database, qc.Stmt)
if err != nil {
return fmt.Errorf("failed to execute management kusto query=%s/%s: %w", qc.Rule.Namespace, qc.Rule.Name, err), 0
}
} else {
iter, err = client.Query(ctx, qc.Rule.Database, qc.Stmt)
if err != nil {
return fmt.Errorf("failed to execute kusto query=%s/%s: %w, %s", qc.Rule.Namespace, qc.Rule.Name, err, qc.Stmt), 0
}
}
var n int
defer iter.Stop()
if err := iter.Do(func(row *table.Row) error {
n++
if n > c.maxNotifications {
return fmt.Errorf("%s/%s returned more than %d icm, throttling query. %w", qc.Rule.Namespace, qc.Rule.Name, c.maxNotifications, alert.ErrTooManyRequests)
}
return fn(ctx, client.Endpoint(), qc, row)
}); err != nil {
return err, 0
}
return nil, n
}
func (c multiKustoClient) Endpoint(db string) string {
cl, ok := c.clients[db]
if !ok {
return "unknown"
}
return cl.Endpoint()
}
type QueryClient interface {
Query(ctx context.Context, db string, query kusto.Statement, options ...kusto.QueryOption) (*kusto.RowIterator, error)
Mgmt(ctx context.Context, db string, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error)
Endpoint() string
}