systemtest/cmd/apmbench/main.go (151 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"context"
"embed"
"flag"
"fmt"
"log"
"runtime/debug"
"testing"
"time"
"go.elastic.co/apm/v2"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/time/rate"
"github.com/elastic/apm-server/systemtest/benchtest"
"github.com/elastic/go-sysinfo"
)
func Benchmark1000Transactions(b *testing.B, l *rate.Limiter) {
b.RunParallel(func(pb *testing.PB) {
tracer := benchtest.NewTracer(b)
for pb.Next() {
for i := 0; i < 1000; i++ {
if err := l.Wait(context.Background()); err != nil {
b.Fatal(err)
}
tracer.StartTransaction("name", "type").End()
}
// TODO(axw) implement a transport that enables streaming
// events in a way that we can block when the queue is full,
// without flushing. Alternatively, make this an option in
// TracerOptions?
tracer.Flush(nil)
}
})
}
func BenchmarkOTLPTraces(b *testing.B, l *rate.Limiter) {
b.RunParallel(func(pb *testing.PB) {
exporter := benchtest.NewOTLPExporter(b)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithBatcher(exporter, sdktrace.WithBlocking()),
)
tracer := tracerProvider.Tracer("tracer")
for pb.Next() {
if err := l.Wait(context.Background()); err != nil {
b.Fatal(err)
}
_, span := tracer.Start(context.Background(), "name")
span.End()
}
tracerProvider.ForceFlush(context.Background())
})
}
// BenchmarkAgentAll matches all the agent event files, allowing the benchmark
// to contain a wider mix of events compared to the other BenchmarkAgent<Name>
// benchmarks. The objective is to measure how the APM Server performs when it
// receives events from multiple agents.
// Even though files are loaded alphabetically and the events sent sequentially
// there is inherent randomness in the order the events are sent to APM Sever.
func BenchmarkAgentAll(b *testing.B, l *rate.Limiter) {
benchmarkAgent(b, l, `apm-*.ndjson`)
}
func BenchmarkAgentGo(b *testing.B, l *rate.Limiter) {
benchmarkAgent(b, l, `apm-go*.ndjson`)
}
func BenchmarkAgentNodeJS(b *testing.B, l *rate.Limiter) {
benchmarkAgent(b, l, `apm-nodejs*.ndjson`)
}
func BenchmarkAgentPython(b *testing.B, l *rate.Limiter) {
benchmarkAgent(b, l, `apm-python*.ndjson`)
}
func BenchmarkAgentRuby(b *testing.B, l *rate.Limiter) {
benchmarkAgent(b, l, `apm-ruby*.ndjson`)
}
func benchmarkAgent(b *testing.B, l *rate.Limiter, expr string) {
h := benchtest.NewEventHandler(b, expr, l)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
h.SendBatches(context.Background())
}
})
}
// events contains custom events that are not in apm-perf.
//
//go:embed events/*.ndjson
var events embed.FS
func BenchmarkTracesAgentAll(b *testing.B, l *rate.Limiter) {
benchmarkTracesAgent(b, l, `apm-*.ndjson`)
}
func BenchmarkTracesAgentGo(b *testing.B, l *rate.Limiter) {
benchmarkTracesAgent(b, l, `apm-go*.ndjson`)
}
func BenchmarkTracesAgentNodeJS(b *testing.B, l *rate.Limiter) {
benchmarkTracesAgent(b, l, `apm-nodejs*.ndjson`)
}
func BenchmarkTracesAgentPython(b *testing.B, l *rate.Limiter) {
benchmarkTracesAgent(b, l, `apm-python*.ndjson`)
}
func BenchmarkTracesAgentRuby(b *testing.B, l *rate.Limiter) {
benchmarkTracesAgent(b, l, `apm-ruby*.ndjson`)
}
// benchmarkTracesAgent benchmarks with traces only. Useful to benchmark TBS.
func benchmarkTracesAgent(b *testing.B, l *rate.Limiter, expr string) {
h := benchtest.NewFSEventHandler(b, expr, l, events)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
h.SendBatches(context.Background())
}
})
}
func Benchmark10000AggregationGroups(b *testing.B, l *rate.Limiter) {
// Benchmark memory usage on aggregating high cardinality data.
// This should generate a lot of groups for service transaction metrics,
// transaction metrics, and service destination metrics.
//
// Using b.N instead of b.RunParallel since this benchmark is about memory
// usage.
//
// If rate limiter is used, it is possible that part of the 10k
// transactions will not fit into the same 1m aggregation period, and this
// will cause a lower observed memory usage.
for n := 0; n < b.N; n++ {
tracer := benchtest.NewTracer(b)
for i := 0; i < 10000; i++ {
if err := l.Wait(context.Background()); err != nil {
b.Fatal(err)
}
tx := tracer.StartTransaction(fmt.Sprintf("name%d", i), fmt.Sprintf("type%d", i))
span := tx.StartSpanOptions(fmt.Sprintf("name%d", i), fmt.Sprintf("type%d", i), apm.SpanOptions{})
span.Context.SetDestinationService(apm.DestinationServiceSpanContext{
Name: fmt.Sprintf("name%d", i),
Resource: fmt.Sprintf("resource%d", i),
})
span.Duration = time.Second
span.End()
tx.End()
}
tracer.Flush(nil)
}
}
func main() {
flag.Parse()
bytes, err := sysMemory()
if err != nil {
log.Fatal(err)
}
debug.SetMemoryLimit(int64(float64(bytes) * 0.9))
if err := benchtest.Run(
Benchmark1000Transactions,
BenchmarkOTLPTraces,
BenchmarkAgentAll,
BenchmarkAgentGo,
BenchmarkAgentNodeJS,
BenchmarkAgentPython,
BenchmarkAgentRuby,
Benchmark10000AggregationGroups,
BenchmarkTracesAgentAll,
BenchmarkTracesAgentGo,
BenchmarkTracesAgentNodeJS,
BenchmarkTracesAgentPython,
BenchmarkTracesAgentRuby,
); err != nil {
log.Fatal(err)
}
}
func sysMemory() (uint64, error) {
host, err := sysinfo.Host()
if err != nil {
return 0, err
}
mem, err := host.Memory()
if err != nil {
return 0, err
}
return mem.Total, nil
}