cmd/ingestor/main.go (417 lines of code) (raw):

package main import ( "bytes" "context" "fmt" "io" "log" "net" "net/http" "net/http/pprof" "os" "os/signal" "runtime" "strings" "syscall" "time" v1 "github.com/Azure/adx-mon/api/v1" "github.com/Azure/adx-mon/ingestor" "github.com/Azure/adx-mon/ingestor/adx" runner "github.com/Azure/adx-mon/ingestor/runner/shutdown" "github.com/Azure/adx-mon/metrics" "github.com/Azure/adx-mon/pkg/limiter" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/adx-mon/pkg/scheduler" "github.com/Azure/adx-mon/pkg/tls" "github.com/Azure/adx-mon/pkg/version" "github.com/Azure/adx-mon/schema" "github.com/Azure/azure-kusto-go/kusto" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/urfave/cli/v2" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ) func main() { app := &cli.App{ Name: "ingestor", Usage: "adx-mon metrics ingestor", Flags: []cli.Flag{ &cli.StringFlag{Name: "kubeconfig", Usage: "/etc/kubernetes/admin.conf"}, &cli.StringFlag{Name: "namespace", Usage: "Namespace for peer discovery"}, &cli.StringFlag{Name: "hostname", Usage: "Hostname of the current node"}, &cli.StringFlag{Name: "region", Usage: "Current region"}, &cli.StringFlag{Name: "storage-dir", Usage: "Directory to store WAL segments"}, &cli.StringSliceFlag{Name: "metrics-kusto-endpoints", Usage: "Kusto endpoint in the format of <db>=<endpoint> for metrics storage"}, &cli.StringSliceFlag{Name: "logs-kusto-endpoints", Usage: "Kusto endpoint in the format of <db>=<endpoint>, handles OTLP logs"}, &cli.BoolFlag{Name: "disable-peer-transfer", Usage: "Disable segment transfers to peers"}, &cli.IntFlag{Name: "uploads", Usage: "Number of concurrent uploads", Value: adx.ConcurrentUploads}, &cli.UintFlag{Name: "max-connections", Usage: "Max number of concurrent connection allowed. 0 for no limit", Value: 1000}, &cli.Int64Flag{Name: "max-segment-size", Usage: "Maximum segment size in bytes", Value: 1024 * 1024 * 1024}, &cli.Int64Flag{Name: "max-transfer-size", Usage: "Maximum segment size in bytes allowed for segment transfers", Value: 100 * 1024 * 1024}, &cli.Int64Flag{Name: "max-disk-usage", Usage: "Maximum disk space usage allowed before signaling back-pressure", Value: 10 * 1024 * 1024 * 1024}, &cli.Int64Flag{Name: "max-segment-count", Usage: "Maximum segment files allowed before signaling back-pressure", Value: 10000}, &cli.DurationFlag{Name: "max-transfer-age", Usage: "Maximum segment age of a segment before direct kusto upload", Value: 90 * time.Second}, &cli.DurationFlag{Name: "max-segment-age", Usage: "Maximum segment age", Value: 5 * time.Minute}, &cli.StringSliceFlag{Name: "drop-prefix", Usage: "Drop transfers that match the file prefix. Transfer filenames are in the form of DestinationDB_Table_..."}, &cli.IntFlag{Name: "max-batch-segments", Usage: "Maximum number of segments per batch", Value: 25}, &cli.BoolFlag{Name: "enable-wal-fsync", Usage: "Enable WAL fsync", Value: false}, &cli.IntFlag{Name: "max-transfer-concurrency", Usage: "Maximum transfer requests in flight", Value: 50}, &cli.IntFlag{Name: "partition-size", Usage: "Maximum number of nodes in a partition", Value: 25}, &cli.DurationFlag{Name: "slow-request-threshold", Usage: "Threshold for slow requests. Set to 0 to disable.", Value: 10 * time.Second}, &cli.StringFlag{Name: "ca-cert", Usage: "CA certificate file"}, &cli.StringFlag{Name: "key", Usage: "Server key file"}, &cli.BoolFlag{Name: "insecure-skip-verify", Usage: "Skip TLS verification"}, }, Action: func(ctx *cli.Context) error { return realMain(ctx) }, Version: version.String(), } if err := app.Run(os.Args); err != nil { logger.Fatalf(err.Error()) } } func realMain(ctx *cli.Context) error { logger.Infof("%s version:%s", os.Args[0], version.String()) svcCtx, cancel := context.WithCancel(context.Background()) defer cancel() scheme := clientgoscheme.Scheme if err := clientgoscheme.AddToScheme(scheme); err != nil { return err } if err := v1.AddToScheme(scheme); err != nil { return err } _, k8scli, ctrlCli, err := newKubeClient(ctx) if err != nil { return err } runtime.MemProfileRate = 4096 runtime.SetBlockProfileRate(int(1 * time.Second)) runtime.SetMutexProfileFraction(1) var ( storageDir string cacert, key string insecureSkipVerify, disablePeerTransfer bool concurrentUploads int maxConns int maxSegmentSize, maxTransferSize int64 maxSegmentAge, maxTransferAge time.Duration ) storageDir = ctx.String("storage-dir") concurrentUploads = ctx.Int("uploads") maxSegmentSize = ctx.Int64("max-segment-size") maxSegmentAge = ctx.Duration("max-segment-age") maxTransferSize = ctx.Int64("max-transfer-size") maxTransferAge = ctx.Duration("max-transfer-age") maxSegmentCount := ctx.Int64("max-segment-count") dropPrefixes := ctx.StringSlice("drop-prefix") maxDiskUsage := ctx.Int64("max-disk-usage") maxBatchSegments := ctx.Int("max-batch-segments") partitionSize := ctx.Int("partition-size") maxConns = int(ctx.Uint("max-connections")) cacert = ctx.String("ca-cert") key = ctx.String("key") insecureSkipVerify = ctx.Bool("insecure-skip-verify") namespace := ctx.String("namespace") hostname := ctx.String("hostname") region := ctx.String("region") disablePeerTransfer = ctx.Bool("disable-peer-transfer") maxTransferConcurrency := ctx.Int("max-transfer-concurrency") enableWALFsync := ctx.Bool("enable-wal-fsync") slowRequestThreshold := ctx.Duration("slow-request-threshold") if namespace == "" { nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") if err == nil { namespace = strings.TrimSpace(string(nsBytes)) } } if hostname == "" { hostname, err = os.Hostname() if err != nil { logger.Errorf("Failed to get hostname: %s", err) } } if cacert != "" || key != "" { if cacert == "" || key == "" { logger.Fatalf("Both --ca-cert and --key are required") } } else { logger.Warnf("Using fake TLS credentials (not for production use!)") certBytes, keyBytes, err := tls.NewFakeTLSCredentials() if err != nil { logger.Fatalf("Failed to create fake TLS credentials: %s", err) } certFile, err := os.CreateTemp("", "cert") if err != nil { logger.Fatalf("Failed to create cert temp file: %s", err) } if _, err := certFile.Write(certBytes); err != nil { logger.Fatalf("Failed to write cert temp file: %s", err) } keyFile, err := os.CreateTemp("", "key") if err != nil { logger.Fatalf("Failed to create key temp file: %s", err) } if _, err := keyFile.Write(keyBytes); err != nil { logger.Fatalf("Failed to write key temp file: %s", err) } cacert = certFile.Name() key = keyFile.Name() insecureSkipVerify = true if err := certFile.Close(); err != nil { logger.Fatalf("Failed to close cert temp file: %s", err) } if err := keyFile.Close(); err != nil { logger.Fatalf("Failed to close key temp file: %s", err) } defer func() { os.Remove(certFile.Name()) os.Remove(keyFile.Name()) }() } logger.Infof("Using TLS ca-cert=%s key=%s", cacert, key) if storageDir == "" { logger.Fatalf("--storage-dir is required") } var ( allowedDatabases []string metricsUploaders, logsUploaders []adx.Uploader metricsDatabases, logsDatabases []string ) metricsKusto := ctx.StringSlice("metrics-kusto-endpoints") if len(metricsKusto) > 0 { metricsUploaders, metricsDatabases, err = newUploaders( metricsKusto, storageDir, concurrentUploads, schema.DefaultMetricsMapping, adx.PromMetrics) if err != nil { logger.Fatalf("Failed to create uploader: %s", err) } } else { logger.Warnf("No kusto endpoint provided, using fake metrics uploader") uploader := adx.NewFakeUploader("FakeMetrics") metricsUploaders = append(metricsUploaders, uploader) metricsDatabases = append(metricsDatabases, uploader.Database()) } logsKusto := ctx.StringSlice("logs-kusto-endpoints") if len(logsKusto) > 0 { logsUploaders, logsDatabases, err = newUploaders( logsKusto, storageDir, concurrentUploads, schema.DefaultLogsMapping, adx.OTLPLogs) if err != nil { logger.Fatalf("Failed to create uploaders for OTLP logs: %s", err) } } else { logger.Warnf("No kusto endpoint provided, using fake logs uploader") uploader := adx.NewFakeUploader("FakeLogs") logsUploaders = append(logsUploaders, uploader) logsDatabases = append(logsDatabases, uploader.Database()) } allowedDatabases = append(allowedDatabases, metricsDatabases...) allowedDatabases = append(allowedDatabases, logsDatabases...) uploadDispatcher := adx.NewDispatcher(append(logsUploaders, metricsUploaders...)) if err := uploadDispatcher.Open(svcCtx); err != nil { logger.Fatalf("Failed to start upload dispatcher: %s", err) } defer uploadDispatcher.Close() var metricsKustoCli []metrics.StatementExecutor for _, cli := range metricsUploaders { metricsKustoCli = append(metricsKustoCli, cli) } var logsKustoCli []metrics.StatementExecutor for _, cli := range logsUploaders { logsKustoCli = append(logsKustoCli, cli) } svc, err := ingestor.NewService(ingestor.ServiceOpts{ K8sCli: k8scli, K8sCtrlCli: ctrlCli, LogsKustoCli: logsKustoCli, MetricsKustoCli: metricsKustoCli, MetricsDatabases: metricsDatabases, AllowedDatabase: metricsDatabases, LogsDatabases: logsDatabases, Namespace: namespace, Hostname: hostname, Region: region, StorageDir: storageDir, Uploader: uploadDispatcher, DisablePeerTransfer: disablePeerTransfer, PartitionSize: partitionSize, MaxSegmentSize: maxSegmentSize, MaxSegmentAge: maxSegmentAge, MaxTransferSize: maxTransferSize, MaxTransferAge: maxTransferAge, MaxSegmentCount: maxSegmentCount, MaxDiskUsage: maxDiskUsage, MaxBatchSegments: maxBatchSegments, EnableWALFsync: enableWALFsync, MaxTransferConcurrency: maxTransferConcurrency, InsecureSkipVerify: insecureSkipVerify, DropFilePrefixes: dropPrefixes, SlowRequestThreshold: slowRequestThreshold.Seconds(), }) if err != nil { logger.Fatalf("Failed to create service: %s", err) } if err := svc.Open(svcCtx); err != nil { logger.Fatalf("Failed to start service: %s", err) } l, err := net.Listen("tcp", ":9090") if err != nil { logger.Fatalf("Failed to listen: %s", err) } if maxConns > 0 { logger.Infof("Limiting connections to %d", maxConns) l = limiter.LimitListener(l, maxConns) } defer l.Close() logger.Infof("Listening at %s", ":9090") mux := http.NewServeMux() mux.HandleFunc("/transfer", svc.HandleTransfer) mux.HandleFunc("/readyz", svc.HandleReady) logger.Infof("Metrics Listening at %s", ":9091") metricsMux := http.NewServeMux() metricsMux.Handle("/metrics", promhttp.Handler()) metricsMux.HandleFunc("/debug/pprof/", pprof.Index) metricsMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) metricsMux.HandleFunc("/debug/pprof/profile", pprof.Profile) metricsMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) metricsMux.HandleFunc("/debug/pprof/trace", pprof.Trace) metricsMux.HandleFunc("/debug/store", svc.HandleDebugStore) srv := &http.Server{ Handler: mux, // Close idle connections fairly often to establish new connections through the load balancer // so that long-lived connections don't stay pinned to the same node indefinitely. IdleTimeout: 5 * time.Minute, } srv.ErrorLog = newLogger() go func() { // Under high connection load and when the server is doing a lot of IO, this // can cause the server to be unresponsive. This pins the accept goroutine // to a single CPU to reduce context switching and improve performance. runtime.LockOSThread() if err := pinToCPU(0); err != nil { logger.Warnf("Failed to pin to CPU: %s", err) } if err := srv.ServeTLS(l, cacert, key); err != nil { logger.Errorf(err.Error()) } }() metricsSrv := &http.Server{Addr: ":9091", Handler: metricsMux} metricsSrv.ErrorLog = newLogger() go func() { if err := metricsSrv.ListenAndServe(); err != nil { logger.Errorf(err.Error()) } }() // Capture SIGINT and SIGTERM to trigger a shutdown and upload/transfer of all pending segments. // This is best-effort, if the process is killed with SIGKILL or the shutdown takes too long // the segments will be delayed until the process is restarted. sc := make(chan os.Signal, 1) signal.Notify(sc, os.Interrupt, syscall.SIGTERM) go func() { sig := <-sc // Stop receiving new samples srv.Shutdown(context.Background()) // Disable writes for internal processes (metrics) if err := svc.DisableWrites(); err != nil { logger.Errorf("Failed to disable writes: %s", err) } logger.Infof("Received signal %s, uploading pending segments...", sig.String()) // Trigger shutdown of any pending background processes cancel() if err := metricsSrv.Shutdown(context.Background()); err != nil { logger.Errorf("Failed to shutdown metrics server: %s", err) } // Shutdown the server and cancel context err := svc.Close() if err != nil { logger.Errorf(err.Error()) } }() // Only start the shutdown runner if running in a cluster if _, err := rest.InClusterConfig(); err == nil { go func() { sd := runner.NewShutDownRunner(k8scli, srv, svc) scheduler.RunForever(svcCtx, time.Minute, sd) }() } <-svcCtx.Done() return nil } func newKubeClient(cCtx *cli.Context) (dynamic.Interface, kubernetes.Interface, ctrlclient.Client, error) { kubeconfig := cCtx.String("kubeconfig") _, err := rest.InClusterConfig() if err == rest.ErrNotInCluster && kubeconfig == "" && os.Getenv("KUBECONIFG=") == "" { logger.Warnf("No kube config provided, using fake kube client") return nil, fake.NewSimpleClientset(), nil, nil } config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, nil, nil, fmt.Errorf("unable to find kube config [%s]: %v", kubeconfig, err) } client, err := kubernetes.NewForConfig(config) if err != nil { return nil, nil, nil, fmt.Errorf("unable to build kube config: %v", err) } dyCli, err := dynamic.NewForConfig(config) if err != nil { return nil, nil, nil, fmt.Errorf("unable to build dynamic client: %v", err) } ctrlCli, err := ctrlclient.New(config, ctrlclient.Options{}) if err != nil { return nil, nil, nil, err } return dyCli, client, ctrlCli, nil } func newKustoClient(endpoint string) (*kusto.Client, error) { kcsb := kusto.NewConnectionStringBuilder(endpoint) if strings.HasPrefix(endpoint, "https://") { kcsb.WithDefaultAzureCredential() } return kusto.New(kcsb) } func parseKustoEndpoint(kustoEndpoint string) (string, string, error) { if !strings.Contains(kustoEndpoint, "=") { return "", "", fmt.Errorf("invalid kusto endpoint: %s", kustoEndpoint) } split := strings.Split(kustoEndpoint, "=") database := split[0] addr := split[1] if database == "" { return "", "", fmt.Errorf("-db is required") } return addr, database, nil } func newUploaders(endpoints []string, storageDir string, concurrentUploads int, defaultMapping schema.SchemaMapping, sampleType adx.SampleType) ([]adx.Uploader, []string, error) { var uploaders []adx.Uploader var uploadDatabaseNames []string for _, endpoint := range endpoints { addr, database, err := parseKustoEndpoint(endpoint) if err != nil { return nil, nil, err } client, err := newKustoClient(addr) if err != nil { return nil, nil, err } uploaders = append(uploaders, adx.NewUploader(client, adx.UploaderOpts{ StorageDir: storageDir, Database: database, ConcurrentUploads: concurrentUploads, DefaultMapping: defaultMapping, SampleType: sampleType, })) uploadDatabaseNames = append(uploadDatabaseNames, database) } return uploaders, uploadDatabaseNames, nil } func newLogger() *log.Logger { return log.New(&writerAdapter{os.Stderr}, "", log.LstdFlags) } type writerAdapter struct { io.Writer } var ( tlsHandshakeError = []byte("http: TLS handshake error") ioEOFError = []byte("EOF") ) func (a *writerAdapter) Write(data []byte) (int, error) { // Ignore TLS handshake errors that results in just a closed connection. Load balancer // health checks will cause this error to be logged. if bytes.Contains(data, tlsHandshakeError) && bytes.Contains(data, ioEOFError) || bytes.Contains(data, []byte("connection reset by peer")) { return len(data), nil } return a.Writer.Write(data) }