cmd/apmtelemetrygen/main.go (168 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package main import ( "bytes" "context" "errors" "fmt" "math/rand" "os" "os/signal" "runtime" "sort" "strings" "syscall" "time" "github.com/spf13/cobra" "github.com/spf13/pflag" "go.elastic.co/ecszap" "go.uber.org/zap" "go.uber.org/zap/zapcore" "github.com/elastic/apm-perf/internal/version" "github.com/elastic/apm-perf/loadgen" ) const envVarPrefix = "ELASTIC_APM_" func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() // Register root command in cobra var rootCmd = &cobra.Command{ Use: "apmtelemetrygen", TraverseChildren: true, PersistentPreRunE: func(cmd *cobra.Command, _ []string) error { var err error cmd.Flags().VisitAll(func(flag *pflag.Flag) { optionName := strings.ToUpper(flag.Name) optionName = strings.ReplaceAll(optionName, "-", "_") envVar := envVarPrefix + optionName if val, ok := os.LookupEnv(envVar); !flag.Changed && ok { if flagErr := flag.Value.Set(val); flagErr != nil { err = fmt.Errorf("invalid environment variable %s: %w", envVar, flagErr) } } }) return err }, } rootCmd.AddCommand(newRunCmd()) rootCmd.AddCommand(&cobra.Command{ Use: "version", Short: "Show current version info", Run: func(cmd *cobra.Command, _ []string) { var buf bytes.Buffer fmt.Fprintf(&buf, "%s %s", version.CommitSha(), version.BuildTime()) fmt.Fprintf(cmd.OutOrStdout(), "%s version %s (%s/%s) [%s]\n", rootCmd.Name(), version.Version, runtime.GOOS, runtime.GOARCH, buf.String(), ) }, }) // Execute commands if err := rootCmd.ExecuteContext(ctx); err != nil { if !errors.Is(err, context.Canceled) { fmt.Println(err) } } } type headersFlag map[string]string func (f headersFlag) String() string { keys := make([]string, 0, len(f)) for k := range f { keys = append(keys, k) } sort.Strings(keys) for i, k := range keys { keys[i] = fmt.Sprintf("%s=%s", k, f[k]) } return strings.Join(keys, ",") } func (f headersFlag) Set(s string) error { k, v, ok := strings.Cut(s, "=") if !ok { return fmt.Errorf("expected k=v, got %q", s) } f[k] = v return nil } func (f headersFlag) Type() string { return "k=v" } func newRunCmd() *cobra.Command { options := &runOptions{Headers: make(map[string]string)} cmd := cobra.Command{ Use: "run", Short: "Runs the load generator for APM telemetry data", RunE: func(cmd *cobra.Command, args []string) error { logger := getLogger(options.Loglevel) config, err := options.toEventHandlerParams(logger) if err != nil { logger.Fatal("Failed to parse flags", zap.Error(err)) } lg, err := loadgen.NewEventHandler(config) if err != nil { logger.Fatal("Failed to create event handler", zap.Error(err)) } for i := 0; i < options.Iterations; i++ { if _, err := lg.SendBatches(cmd.Context()); err != nil { if !options.IgnoreErrors { return err } logger.Error("Failed to send batches", zap.Error(err)) } } return nil }, } cmd.Flags().Var(headersFlag(options.Headers), "header", "Extra headers to send. Can be specified multiple times") cmd.Flags().StringVar(&options.ServerURL, "server-url", "", "Server URL (default http://127.0.0.1:8200)") cmd.Flags().StringVar(&options.SecretToken, "secret-token", "", "Secret token for APM Server. Managed intake service doesn't support secret token") cmd.Flags().StringVar(&options.APIKey, "api-key", "", "API key to use for authentication") cmd.Flags().StringVar(&options.Loglevel, "log-level", "debug", "Specify the log level to use when running this command. Supported values: debug, info, warn, error") cmd.Flags().StringVar(&options.Protocol, "protocol", "apm/http", "Specify the protocol to use when sending events. Supported values: apm/http, otlp/http") cmd.Flags().StringVar(&options.Datatype, "data-type", "any", "Specify the data type to use when sending events. Supported values: any, logs, metrics, traces") cmd.Flags().StringVar(&options.EventRate, "event-rate", "0/s", "Must be in the format <number of events>/<time>. <time> is parsed") cmd.Flags().IntVar(&options.Iterations, "iterations", 1, "The number of times to replay the canned data for") cmd.Flags().BoolVar(&options.IgnoreErrors, "ignore-errors", false, "Ignore HTTP errors while sending events") cmd.Flags().BoolVar(&options.RewriteIDs, "rewrite-ids", true, "Enable or disable rewriting IDs of stored events in ouput.") cmd.Flags().BoolVar(&options.RewriteTimestamps, "rewrite-timestamps", true, "Enable or disable rewriting timestamps of stored events in ouput.") return &cmd } type runOptions struct { Headers map[string]string ServerURL string SecretToken string APIKey string Loglevel string Protocol string Datatype string EventRate string Iterations int IgnoreErrors bool // Specific if to rewrite IDs in stored events. // This allows replaying the data with no variance on event IDs. // Useful for troubleshooting. RewriteIDs bool // Specific if to rewrite timestamps in stored events. // This allows replaying the data with no variance on event timestamps. // Useful for troubleshooting. RewriteTimestamps bool } func (opts *runOptions) toEventHandlerParams(logger *zap.Logger) (loadgen.EventHandlerParams, error) { burst, interval, err := loadgen.ParseEventRate(opts.EventRate) if err != nil { return loadgen.EventHandlerParams{}, err } return loadgen.EventHandlerParams{ Logger: logger, Path: "apm*.ndjson", URL: opts.ServerURL, Token: opts.SecretToken, APIKey: opts.APIKey, Headers: opts.Headers, IgnoreErrors: opts.IgnoreErrors, Protocol: opts.Protocol, Datatype: opts.Datatype, Limiter: loadgen.GetNewLimiter(burst, interval), Rand: rand.New(rand.NewSource(time.Now().UnixNano())), RewriteIDs: opts.RewriteIDs, RewriteTimestamps: opts.RewriteTimestamps, }, nil } func getLogger(logLevel string) *zap.Logger { level, err := zapcore.ParseLevel(logLevel) if err != nil { level = zap.InfoLevel } return zap.New(ecszap.NewCore( ecszap.NewDefaultEncoderConfig(), os.Stdout, level, ), zap.AddCaller()) }