cmd/apmbench/main.go (129 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package main
import (
"context"
"flag"
"fmt"
"log"
"testing"
"time"
"go.uber.org/zap"
)
func main() {
flag.Parse()
logger, err := setupLogger()
if err != nil {
log.Fatalf("failed to setup logger: %v", err)
}
extraMetrics := func(b *testing.B) {}
resetStoreFunc := func() {}
if cfg.BenchmarkTelemetryEndpoint != "" {
telemetry := telemetry{endpoint: cfg.BenchmarkTelemetryEndpoint}
extraMetrics = func(b *testing.B) {
// TODO (lahsivjar): get a context with timeout based on test timeout
if err := assertCleanupState(context.Background(), telemetry, logger); err != nil {
logger.Warn(
"failed to get cleanup metric, continuing without cleanup",
zap.Error(err),
)
}
m, err := telemetry.GetAll()
if err != nil {
logger.Warn(
"failed to retrive benchmark metrics, extra metrics will not be reported",
zap.Error(err),
)
return
}
// extra metrics may be aggregated by a grouping key. We can sum all the
// values for the grouping key to get the final benchmark results.
for unit, grp := range m {
var total float64
for _, val := range grp {
total += val
}
b.ReportMetric(total, unit)
}
}
resetStoreFunc = func() {
if err := telemetry.Reset(); err != nil {
logger.Warn(
"failed to reset store, benchmark report may be corrupted",
zap.Error(err),
)
}
}
}
// Run benchmarks
if err := Run(
extraMetrics,
resetStoreFunc,
Benchmark1000Transactions,
BenchmarkAgentAll,
BenchmarkAgentGo,
BenchmarkAgentNodeJS,
BenchmarkAgentPython,
BenchmarkAgentRuby,
Benchmark10000AggregationGroups,
BenchmarkOTLPTraces,
BenchmarkOTLPLogs,
BenchmarkOTLPMetrics,
); err != nil {
logger.Fatal("failed to run benchmarks", zap.Error(err))
}
logger.Info("finished running benchmarks")
}
func setupLogger() (*zap.Logger, error) {
loggerCfg := zap.NewProductionConfig()
if cfg.Debug {
loggerCfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
}
return loggerCfg.Build()
}
func assertCleanupState(ctx context.Context, telemetry telemetry, logger *zap.Logger) error {
if len(cfg.CleanupKeys) == 0 {
// If no cleanup keys are specified then, as a best effort, sleep
// for a specific duration to ensure the pipelines have a chance to
// be fully consumed and metrics to be reported.
logger.Warn("no cleanup keys specified, benchmark results may get corrupted")
return nil
}
t := time.NewTicker(5 * time.Second)
defer t.Stop()
var idx int
for {
// cleanup is successful if we have asserted all the cleanup keys.
if idx == len(cfg.CleanupKeys) {
logger.Debug("cleanup condition satisfied")
return nil
}
select {
case <-ctx.Done():
return fmt.Errorf("cleanup condition not satisfied: %w", ctx.Err())
case <-t.C:
for idx < len(cfg.CleanupKeys) {
key := cfg.CleanupKeys[idx]
m, err := telemetry.Get(key)
if err != nil {
logger.Warn(
"failed to get cleanup metric, will be retried",
zap.Error(err),
zap.String("key", key),
)
break
}
ok := true
for gid, v := range m {
if v != 0 {
logger.Debug(
"cleanup condition not satisfied, will be retried",
zap.String("key", key),
zap.String("group", gid),
zap.Float64("val", v),
)
ok = false
break
}
}
// If a cleanup key fails then there is no need to try anything else.
if !ok {
break
}
// If the cleanup metric is successful for the current key then
// move onto the next key.
idx++
}
}
}
}
func init() {
testing.Init()
}