loadgen/eventhandler.go (93 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. // Package loadgen contains code for generating load based on real agent data. package loadgen import ( "embed" "fmt" "math/rand" "path/filepath" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/time/rate" "github.com/elastic/apm-perf/loadgen/eventhandler" "github.com/elastic/apm-perf/pkg/supportedstacks" "go.elastic.co/apm/v2/transport" ) // events holds the current stored events. // //go:embed events/*.ndjson var events embed.FS type EventHandlerParams struct { Logger *zap.Logger Path string URL string Token string APIKey string Limiter *rate.Limiter Rand *rand.Rand IgnoreErrors bool RunForever bool RewriteIDs bool RewriteServiceNames bool RewriteServiceNodeNames bool RewriteServiceTargetNames bool RewriteSpanNames bool RewriteTransactionNames bool RewriteTransactionTypes bool RewriteTimestamps bool // Headers contains HTTP headers shipped with all requests. // NOTE: these headers are not sanitized in logs. Headers map[string]string // One of: apm/http, otlp/http // NOTE: otlp/grpc is not supported Protocol string // One of: any, logs, metrics, traces // NOTE: for Protocol apm/http there is no difference // between each value. When using Protocol otlp/http // each data type requires a separate EventHandler. Datatype string supportedstacks.TargetStackVersion } func (e EventHandlerParams) MarshalLogObject(enc zapcore.ObjectEncoder) error { // NOTE: Logger is ignored. enc.AddString("path", e.Path) enc.AddString("url", e.URL) enc.AddString("token", "REDACTED") enc.AddString("api_key", "REDACTED") // FIXME: add Limiter. // FIXME: add Rand. enc.AddBool("ignore_errors", e.IgnoreErrors) enc.AddBool("rewrite_ids", e.RewriteIDs) enc.AddBool("rewrite_service_names", e.RewriteServiceNames) enc.AddBool("rewrite_service_node_names", e.RewriteServiceNodeNames) enc.AddBool("rewrite_service_target_names", e.RewriteServiceTargetNames) enc.AddBool("rewrite_span_names", e.RewriteSpanNames) enc.AddBool("rewrite_transaction_names", e.RewriteTransactionNames) enc.AddBool("rewrite_transaction_tpes", e.RewriteTransactionTypes) enc.AddBool("rewrite_timestamps", e.RewriteTimestamps) for k, v := range e.Headers { enc.AddString(k, v) } return nil } // NewEventHandler creates a eventhandler which loads the files matching the // passed regex. func NewEventHandler(p EventHandlerParams) (*eventhandler.Handler, error) { if p.Logger == nil { return nil, fmt.Errorf("nil logger in params") } switch p.Protocol { case "apm/http": return newAPMEventHandler(p) case "otlp/http": // TODO: support OTLP event handling // switch p.Datatype { // case "logs": // case "metrics": // case "traces": // } return nil, fmt.Errorf("invalid datatype (%s) for protocol (%s)", p.Datatype, p.Protocol) } return nil, fmt.Errorf("invalid or unsupported protocol (%s)", p.Protocol) } func newAPMEventHandler(p EventHandlerParams) (*eventhandler.Handler, error) { // We call the HTTPTransport constructor to avoid copying all the config // parsing that creates the `*http.Client`. t, err := transport.NewHTTPTransport(transport.HTTPTransportOptions{}) if err != nil { return nil, fmt.Errorf("cannot create HTTP transport: %w", err) } c := eventhandler.Config{ Path: filepath.Join("events", p.Path), Transport: eventhandler.NewAPMTransport(p.Logger, t.Client, p.URL, p.Token, p.APIKey, p.Headers), Storage: events, Limiter: p.Limiter, Rand: p.Rand, IgnoreErrors: p.IgnoreErrors, RunForever: p.RunForever, RewriteIDs: p.RewriteIDs, RewriteServiceNames: p.RewriteServiceNames, RewriteServiceNodeNames: p.RewriteServiceNodeNames, RewriteServiceTargetNames: p.RewriteServiceTargetNames, RewriteSpanNames: p.RewriteSpanNames, RewriteTransactionNames: p.RewriteTransactionNames, RewriteTransactionTypes: p.RewriteTransactionTypes, RewriteTimestamps: p.RewriteTimestamps, TargetStackVersion: p.TargetStackVersion, } return eventhandler.NewAPM(p.Logger, c) }