systemtest/benchtest/main.go (233 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package benchtest import ( "context" "crypto/tls" "errors" "flag" "fmt" "log" "net/http" "os" "reflect" "runtime" "sort" "strings" "sync" "testing" "time" "go.elastic.co/apm/v2/stacktrace" "go.uber.org/zap" "go.uber.org/zap/zaptest" "golang.org/x/time/rate" "github.com/elastic/apm-perf/loadgen" loadgencfg "github.com/elastic/apm-perf/loadgen/config" "github.com/elastic/apm-server/systemtest/benchtest/expvar" ) const waitInactiveTimeout = 60 * time.Second // BenchmarkFunc is the benchmark function type accepted by Run. type BenchmarkFunc func(*testing.B, *rate.Limiter) const benchmarkFuncPrefix = "Benchmark" type benchmark struct { name string f BenchmarkFunc } func runBenchmark(f BenchmarkFunc) (testing.BenchmarkResult, bool, bool, error) { // Run the benchmark. testing.Benchmark will invoke the function // multiple times, but only returns the final result. var failed bool var skipped bool var collector *expvar.Collector result := testing.Benchmark(func(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() var err error server := loadgencfg.Config.ServerURL.String() collector, err = expvar.StartNewCollector(ctx, server, 100*time.Millisecond, zaptest.NewLogger(b)) if err != nil { b.Error(err) failed = b.Failed() return } limiter := loadgen.GetNewLimiter(loadgencfg.Config.EventRate.Burst, loadgencfg.Config.EventRate.Interval) b.ResetTimer() signal := make(chan bool) // f can panic or call runtime.Goexit, stopping the goroutine. // When that happens the function won't return and ok=false will // be returned, making the benchmark looks like failure. go func() { // Signal that we're done whether we return normally // or by SkipNow/FailNow's runtime.Goexit. defer func() { signal <- true }() f(b, limiter) }() <-signal if !b.Failed() { watcher, err := collector.WatchMetric(expvar.ActiveEvents, 0) if err != nil { b.Error(err) } else if status := <-watcher; !status { b.Error("failed to wait for APM server to be inactive") } } failed = b.Failed() skipped = b.Skipped() }) if result.Extra != nil { addExpvarMetrics(&result, collector, benchConfig.Detailed) } return result, failed, skipped, nil } func addExpvarMetrics(result *testing.BenchmarkResult, collector *expvar.Collector, detailed bool) { result.Bytes = collector.Delta(expvar.Bytes) result.MemAllocs = uint64(collector.Delta(expvar.MemAllocs)) result.MemBytes = uint64(collector.Delta(expvar.MemBytes)) result.Extra["events/sec"] = float64(collector.Delta(expvar.TotalEvents)) / result.T.Seconds() if detailed { result.Extra["intake_events_accepted/sec"] = float64(collector.Delta(expvar.IntakeEventsAccepted)) / result.T.Seconds() result.Extra["intake_events_errors/sec"] = float64(collector.Delta(expvar.IntakeEventsErrorsInvalid)+collector.Delta(expvar.IntakeEventsErrorsTooLarge)) / result.T.Seconds() result.Extra["txs/sec"] = float64(collector.Delta(expvar.TransactionsProcessed)) / result.T.Seconds() result.Extra["spans/sec"] = float64(collector.Delta(expvar.SpansProcessed)) / result.T.Seconds() result.Extra["metrics/sec"] = float64(collector.Delta(expvar.MetricsProcessed)) / result.T.Seconds() result.Extra["errors/sec"] = float64(collector.Delta(expvar.ErrorsProcessed)) / result.T.Seconds() result.Extra["gc_cycles"] = float64(collector.Delta(expvar.NumGC)) result.Extra["max_rss"] = float64(collector.Get(expvar.RSSMemoryBytes).Max) result.Extra["max_goroutines"] = float64(collector.Get(expvar.Goroutines).Max) result.Extra["max_heap_alloc"] = float64(collector.Get(expvar.HeapAlloc).Max) result.Extra["max_heap_objects"] = float64(collector.Get(expvar.HeapObjects).Max) result.Extra["mean_available_indexers"] = float64(collector.Get(expvar.AvailableBulkRequests).Mean) result.Extra["tbs_lsm_size"] = float64(collector.Get(expvar.TBSLsmSize).Max) result.Extra["tbs_vlog_size"] = float64(collector.Get(expvar.TBSVlogSize).Max) } // Record the number of error responses returned by the server: lower is better. errorResponses := collector.Delta(expvar.ErrorElasticResponses) + collector.Delta(expvar.ErrorOTLPTracesResponses) + collector.Delta(expvar.ErrorOTLPMetricsResponses) if detailed || errorResponses > 0 { result.Extra["error_responses/sec"] = float64(errorResponses) / result.T.Seconds() } } func fullBenchmarkName(name string, agents int) string { if agents != 1 { return fmt.Sprintf("%s-%d", name, agents) } return name } func benchmarkFuncName(f BenchmarkFunc) (string, error) { ffunc := runtime.FuncForPC(reflect.ValueOf(f).Pointer()) if ffunc == nil { return "", errors.New("runtime.FuncForPC returned nil") } fullName := ffunc.Name() _, name := stacktrace.SplitFunctionName(fullName) if !strings.HasPrefix(name, benchmarkFuncPrefix) { return "", fmt.Errorf("benchmark function names must begin with %q (got %q)", fullName, benchmarkFuncPrefix) } return name, nil } // Run runs the given benchmarks according to the flags defined. // // Run expects to receive statically-defined functions whose names // are all prefixed with "Benchmark", like those that are designed // to work with "go test". func Run(allBenchmarks ...BenchmarkFunc) error { // Set flags in package testing. testing.Init() if err := flag.Set("test.benchtime", benchConfig.Benchtime.String()); err != nil { return err } // Sets the http.DefaultClient.Transport.TLSClientConfig.InsecureSkipVerify // to match the "-secure" flag value. verifyTLS := loadgencfg.Config.Secure http.DefaultClient.Transport = &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: !verifyTLS}, } os.Setenv("ELASTIC_APM_VERIFY_SERVER_CERT", fmt.Sprint(verifyTLS)) var profiles profiles if err := profiles.init(); err != nil { return err } defer func() { if err := profiles.writeProfiles(); err != nil { log.Printf("failed to write profiles: %s", err) } }() matchRE := benchConfig.RunRE benchmarks := make([]benchmark, 0, len(allBenchmarks)) for _, benchmarkFunc := range allBenchmarks { name, err := benchmarkFuncName(benchmarkFunc) if err != nil { return err } if matchRE == nil || matchRE.MatchString(name) { benchmarks = append(benchmarks, benchmark{ name: name, f: benchmarkFunc, }) } } sort.Slice(benchmarks, func(i, j int) bool { return benchmarks[i].name < benchmarks[j].name }) var maxLen int agentsList := benchConfig.AgentsList for _, agents := range agentsList { for _, benchmark := range benchmarks { if n := len(fullBenchmarkName(benchmark.name, agents)); n > maxLen { maxLen = n } } } // Warm up the APM Server with the specified `-agents`. Only the first // value in the list will be used. if len(agentsList) > 0 && benchConfig.WarmupTime.Seconds() > 0 { agents := agentsList[0] serverURL := loadgencfg.Config.ServerURL.String() secretToken := loadgencfg.Config.SecretToken if err := warmup(agents, benchConfig.WarmupTime, serverURL, secretToken); err != nil { return fmt.Errorf("warm-up failed with %d agents: %v", agents, err) } } for _, agents := range agentsList { runtime.GOMAXPROCS(int(agents)) for _, benchmark := range benchmarks { name := fullBenchmarkName(benchmark.name, agents) for i := 0; i < int(benchConfig.Count); i++ { profileChan := profiles.record(name) result, failed, skipped, err := runBenchmark(benchmark.f) if err != nil { return err } if skipped { continue } if failed { fmt.Fprintf(os.Stderr, "--- FAIL: %s\n", name) return fmt.Errorf("benchmark %q failed", name) } else { fmt.Fprintf(os.Stderr, "%-*s\t%s\t%s\n", maxLen, name, result, result.MemString()) } if err := <-profileChan; err != nil { return err } } } } return nil } // warmup sends events to the remote APM Server using the specified number of // agents for the specified duration. func warmup(agents int, duration time.Duration, url, token string) error { rl := loadgen.GetNewLimiter(loadgencfg.Config.EventRate.Burst, loadgencfg.Config.EventRate.Interval) h, err := loadgen.NewEventHandler(loadgen.EventHandlerParams{ Logger: zap.NewNop(), Protocol: "apm/http", Path: `apm-*.ndjson`, URL: url, Token: token, Limiter: rl, }) if err != nil { return fmt.Errorf("unable to create warm-up handler: %w", err) } ctx, cancel := context.WithTimeout(context.Background(), duration) defer cancel() var wg sync.WaitGroup wg.Add(agents) for i := 0; i < agents; i++ { go func() { defer wg.Done() sendErr := h.SendBatchesInLoop(ctx) if sendErr != nil && !errors.Is(sendErr, context.DeadlineExceeded) { log.Printf("failed to send batches: %v", sendErr) } }() } wg.Wait() ctx, cancel = context.WithTimeout(context.Background(), waitInactiveTimeout) defer cancel() if err := expvar.WaitUntilServerInactive(ctx, url); err != nil { return fmt.Errorf("received error waiting for server inactive: %w", err) } return nil }