cmd/alerter/main.go (141 lines of code) (raw):
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"github.com/Azure/adx-mon/alerter"
alertrulev1 "github.com/Azure/adx-mon/api/v1"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/version"
"github.com/urfave/cli/v2" // imports as package "cli"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)
func main() {
app := &cli.App{
Name: "alerter",
Usage: "adx-mon alerting engine for ADX",
Flags: []cli.Flag{
&cli.StringSliceFlag{Name: "kusto-endpoint", Usage: "Kusto endpoint in the format of <name>=<endpoint>"},
&cli.StringFlag{Name: "kubeconfig", Usage: "/etc/kubernetes/admin.conf"},
&cli.IntFlag{Name: "port", Value: 4023, Usage: "Metrics port number"},
&cli.StringFlag{Name: "auth-msi-id", Usage: "MSI client ID for authentication to Kusto"},
&cli.StringFlag{Name: "auth-token", Usage: "Application token for authentication to Kusto"},
&cli.StringFlag{Name: "cloud", Usage: "Azure cloud"},
&cli.StringFlag{Name: "region", Usage: "Current region"},
&cli.StringFlag{Name: "alerter-address", Usage: "Address of the alert notification service"},
&cli.IntFlag{Name: "concurrency", Value: 10, Usage: "Number of concurrent queries to run"},
&cli.IntFlag{Name: "max-notifications", Value: 25, Usage: "Maximum number of notifications to send per rule"},
&cli.StringSliceFlag{Name: "tag", Usage: "Tag in the format of <key>=<value> that applies to execution context"},
},
Action: realMain,
Commands: []*cli.Command{
NewLintCommand(),
},
Version: version.String(),
}
if err := app.Run(os.Args); err != nil {
logger.Fatalf(err.Error())
}
}
func realMain(ctx *cli.Context) error {
logger.Infof("%s version:%s", os.Args[0], version.String())
endpoints := make(map[string]string)
endpointsArg := ctx.StringSlice("kusto-endpoint")
for _, v := range endpointsArg {
parts := strings.Split(v, "=")
if len(parts) != 2 {
return cli.Exit("Invalid kusto-endpoint format, expected <name>=<endpoint>", 1)
}
endpoints[parts[0]] = parts[1]
}
tags := make(map[string]string)
tagsArg := ctx.StringSlice("tag")
for _, v := range tagsArg {
parts := strings.Split(v, "=")
if len(parts) != 2 {
return cli.Exit("Invalid tag format, expected <key>=<value>", 1)
}
tags[strings.ToLower(parts[0])] = strings.ToLower(parts[1])
}
// Always add region and cloud tags which are required params for alerter currently.
tags["region"] = strings.ToLower(ctx.String("region"))
tags["cloud"] = strings.ToLower(ctx.String("cloud"))
for k, v := range tags {
logger.Infof("Using tag %s=%s", k, v)
}
scheme := clientgoscheme.Scheme
if err := clientgoscheme.AddToScheme(scheme); err != nil {
return err
}
if err := alertrulev1.AddToScheme(scheme); err != nil {
return err
}
_, _, ctrlCli, err := newKubeClient(ctx)
if err != nil {
return err
}
opts := &alerter.AlerterOpts{
Port: ctx.Int("port"),
KustoEndpoints: endpoints,
Region: ctx.String("region"),
Cloud: ctx.String("cloud"),
AlertAddr: ctx.String("alerter-address"),
Concurrency: ctx.Int("concurrency"),
MaxNotifications: ctx.Int("max-notifications"),
MSIID: ctx.String("auth-msi-id"),
KustoToken: ctx.String("auth-token"),
Tags: tags,
CtrlCli: ctrlCli,
}
svcCtx, cancel := context.WithCancel(context.Background())
defer cancel()
if ctx.String("lint-dir") != "" {
return alerter.Lint(svcCtx, opts, ctx.String("lint-dir"))
}
svc, err := alerter.NewService(opts)
if err != nil {
return err
}
if err := svc.Open(svcCtx); err != nil {
return err
}
sc := make(chan os.Signal, 1)
signal.Notify(sc, os.Interrupt, syscall.SIGTERM)
go func() {
sig := <-sc
cancel()
logger.Infof("Received signal %s, exiting...", sig.String())
// Shutdown the server and cancel context
err := svc.Close()
if err != nil {
logger.Errorf(err.Error())
}
}()
<-svcCtx.Done()
return nil
}
func newKubeClient(cCtx *cli.Context) (dynamic.Interface, *kubernetes.Clientset, ctrlclient.Client, error) {
config, err := clientcmd.BuildConfigFromFlags("", cCtx.String("kubeconfig"))
if err != nil {
logger.Warnf("No kube config provided, using fake kube client")
return nil, nil, nil, fmt.Errorf("unable to find kube config [%s]: %v", cCtx.String("kubeconfig"), err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to build kube config: %v", err)
}
dyCli, err := dynamic.NewForConfig(config)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to build dynamic client: %v", err)
}
ctrlCli, err := ctrlclient.New(config, ctrlclient.Options{})
if err != nil {
return nil, nil, nil, err
}
return dyCli, client, ctrlCli, nil
}