cmd/apmbench/bench.go (119 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package main import ( "context" "errors" "fmt" "net/url" "strings" "testing" "time" "go.elastic.co/apm/v2" "go.elastic.co/apm/v2/transport" "go.uber.org/zap" "golang.org/x/time/rate" "github.com/elastic/apm-perf/loadgen" loadgencfg "github.com/elastic/apm-perf/loadgen/config" "github.com/elastic/apm-perf/loadgen/eventhandler" ) func Benchmark1000Transactions(b *testing.B, l *rate.Limiter) { b.RunParallel(func(pb *testing.PB) { tracer := newTracer(b) for pb.Next() { for i := 0; i < 1000; i++ { if err := l.Wait(context.Background()); err != nil { // panicing ensures that the error is reported // see: https://github.com/golang/go/issues/32066 panic(err) } tracer.StartTransaction("name", "type").End() } // TODO(axw) implement a transport that enables streaming // events in a way that we can block when the queue is full, // without flushing. Alternatively, make this an option in // TracerOptions? tracer.Flush(nil) } }) } func BenchmarkAgentAll(b *testing.B, l *rate.Limiter) { benchmarkAgent(b, l, `apm-*.ndjson`) } func BenchmarkAgentGo(b *testing.B, l *rate.Limiter) { benchmarkAgent(b, l, `apm-go*.ndjson`) } func BenchmarkAgentNodeJS(b *testing.B, l *rate.Limiter) { benchmarkAgent(b, l, `apm-nodejs*.ndjson`) } func BenchmarkAgentPython(b *testing.B, l *rate.Limiter) { benchmarkAgent(b, l, `apm-python*.ndjson`) } func BenchmarkAgentRuby(b *testing.B, l *rate.Limiter) { benchmarkAgent(b, l, `apm-ruby*.ndjson`) } func Benchmark10000AggregationGroups(b *testing.B, l *rate.Limiter) { // Benchmark memory usage on aggregating high cardinality data. // This should generate a lot of groups for service transaction metrics, // transaction metrics, and service destination metrics. // // Using b.N instead of b.RunParallel since this benchmark is about memory // usage. // // If rate limiter is used, it is possible that part of the 10k // transactions will not fit into the same 1m aggregation period, and this // will cause a lower observed memory usage. for n := 0; n < b.N; n++ { tracer := newTracer(b) for i := 0; i < 10000; i++ { if err := l.Wait(context.Background()); err != nil { // panicing ensures that the error is reported // see: https://github.com/golang/go/issues/32066 panic(err) } tx := tracer.StartTransaction(fmt.Sprintf("name%d", i), fmt.Sprintf("type%d", i)) span := tx.StartSpanOptions(fmt.Sprintf("name%d", i), fmt.Sprintf("type%d", i), apm.SpanOptions{}) span.Context.SetServiceTarget(apm.ServiceTargetSpanContext{ Name: fmt.Sprintf("name%d", i), Type: fmt.Sprintf("resource%d", i), }) span.Duration = time.Second span.End() tx.End() } tracer.Flush(nil) } } func newTracer(tb testing.TB) *apm.Tracer { httpTransport, err := transport.NewHTTPTransport(transport.HTTPTransportOptions{ ServerURLs: []*url.URL{loadgencfg.Config.ServerURL}, APIKey: loadgencfg.Config.APIKey, SecretToken: loadgencfg.Config.SecretToken, }) if err != nil { // panicing ensures that the error is reported // see: https://github.com/golang/go/issues/32066 panic(err) } tracer, err := apm.NewTracerOptions(apm.TracerOptions{ Transport: httpTransport, }) if err != nil { // panicing ensures that the error is reported // see: https://github.com/golang/go/issues/32066 panic(err) } tb.Cleanup(tracer.Close) return tracer } func newEventHandler(tb testing.TB, p string, l *rate.Limiter) *eventhandler.Handler { protocol := "apm/http" if strings.HasPrefix(p, "otlp-") { protocol = "otlp/http" } h, err := loadgen.NewEventHandler(loadgen.EventHandlerParams{ Logger: zap.NewNop(), Path: p, Limiter: l, URL: loadgencfg.Config.ServerURL.String(), Token: loadgencfg.Config.SecretToken, APIKey: loadgencfg.Config.APIKey, IgnoreErrors: loadgencfg.Config.IgnoreErrors, RewriteIDs: loadgencfg.Config.RewriteIDs, RewriteTimestamps: loadgencfg.Config.RewriteTimestamps, Headers: loadgencfg.Config.Headers, Protocol: protocol, }) if err != nil { // panicing ensures that the error is reported // see: https://github.com/golang/go/issues/32066 panic(err) } return h } func benchmarkAgent(b *testing.B, l *rate.Limiter, expr string) { h := newEventHandler(b, expr, l) b.RunParallel(func(pb *testing.PB) { for pb.Next() { if _, err := h.SendBatches(context.Background()); err != nil { // NOTE(marclop): we are ignoring the context.DeadlineExceeded error // because we are not interested in the error itself, but in the // performance of the agent. Other errors are still reported. if !errors.Is(err, context.DeadlineExceeded) { // panicing ensures that the error is reported // see: https://github.com/golang/go/issues/32066 panic(fmt.Sprintf("failed to send batches: %+v", err)) } } } }) }