loadgen/cmd/otelbench/main.go (144 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"testing"
"time"
"github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver"
)
// getSignal returns a slice of signal names to be benchmarked according to Config
func getSignals() (signals []string) {
if Config.Logs {
signals = append(signals, "logs")
}
if Config.Metrics {
signals = append(signals, "metrics")
}
if Config.Traces {
signals = append(signals, "traces")
}
return
}
// getExporters returns a slice of exporter names to be benchmarked according to Config
func getExporters() (exporters []string) {
if Config.ExporterOTLP {
exporters = append(exporters, "otlp")
}
if Config.ExporterOTLPHTTP {
exporters = append(exporters, "otlphttp")
}
return
}
func fullBenchmarkName(signal, exporter string, concurrency int) string {
return fmt.Sprintf("BenchmarkOTelbench/%s-%s-%d", signal, exporter, concurrency)
}
func runBench(ctx context.Context, signal, exporter string, concurrency int, reporter func(b *testing.B)) testing.BenchmarkResult {
return testing.Benchmark(func(b *testing.B) {
// loadgenreceiver will send stats about generated telemetry when it finishes sending b.N iterations
logsDone := make(chan loadgenreceiver.Stats)
metricsDone := make(chan loadgenreceiver.Stats)
tracesDone := make(chan loadgenreceiver.Stats)
if signal != "logs" {
close(logsDone)
}
if signal != "metrics" {
close(metricsDone)
}
if signal != "traces" {
close(tracesDone)
}
stop := make(chan struct{}) // close channel to stop the loadgen collector
done := make(chan struct{}) // close channel to exit benchmark after stats were reported
go func() {
logsStats := <-logsDone
metricsStats := <-metricsDone
tracesStats := <-tracesDone
b.StopTimer()
stats := logsStats.Add(metricsStats).Add(tracesStats)
elapsedSeconds := b.Elapsed().Seconds()
close(stop)
b.ReportMetric(float64(stats.LogRecords)/elapsedSeconds, "logs/s")
b.ReportMetric(float64(stats.MetricDataPoints)/elapsedSeconds, "metric_points/s")
b.ReportMetric(float64(stats.Spans)/elapsedSeconds, "spans/s")
b.ReportMetric(float64(stats.Requests)/elapsedSeconds, "requests/s")
b.ReportMetric(float64(stats.FailedLogRecords)/elapsedSeconds, "failed_logs/s")
b.ReportMetric(float64(stats.FailedMetricDataPoints)/elapsedSeconds, "failed_metric_points/s")
b.ReportMetric(float64(stats.FailedSpans)/elapsedSeconds, "failed_spans/s")
b.ReportMetric(float64(stats.FailedRequests)/elapsedSeconds, "failed_requests/s")
reporter(b)
close(done)
}()
err := RunCollector(ctx, stop, configs(exporter, signal, b.N, concurrency), logsDone, metricsDone, tracesDone)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
b.Fatal(err)
}
<-done
})
}
func main() {
testing.Init()
if err := Init(); err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(2)
}
flag.Parse()
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
// TODO(carsonip): configurable warm up
var maxLen int
for _, concurrency := range Config.ConcurrencyList {
for _, signal := range getSignals() {
for _, exporter := range getExporters() {
maxLen = max(maxLen, len(fullBenchmarkName(signal, exporter, concurrency)))
}
}
}
fetcher, ignore, err := newElasticsearchStatsFetcher(elasticsearchTelemetryConfig(Config.Telemetry))
if err != nil {
fmt.Fprintln(os.Stderr, err)
if !ignore {
os.Exit(2)
}
}
for _, concurrency := range Config.ConcurrencyList {
for _, signal := range getSignals() {
for _, exporter := range getExporters() {
benchName := fullBenchmarkName(signal, exporter, concurrency)
t := time.Now().UTC()
result := runBench(ctx, signal, exporter, concurrency, func(b *testing.B) {
if fetcher == nil {
return
}
// after each run wait a bit to capture late metric arrivals
time.Sleep(10 * time.Second)
stats, err := fetcher.FetchStats(ctx, t, time.Now().UTC())
if err != nil {
fmt.Fprintf(os.Stderr, "error while fetching remote stats %s", err)
return
}
for unit, n := range stats {
b.ReportMetric(n, unit)
}
})
// write benchmark result to stdout, as stderr may be cluttered with collector logs
fmt.Printf("%-*s\t%s\n", maxLen, benchName, result.String())
// break early if context was canceled
select {
case <-ctx.Done():
return
default:
}
}
}
}
}
func configs(exporter, signal string, iterations, concurrency int) (configFiles []string) {
configFiles = append(configFiles, Config.CollectorConfigPath)
configFiles = append(configFiles, ExporterConfigs(exporter)...)
configFiles = append(configFiles, SetIterations(iterations)...)
configFiles = append(configFiles, SetConcurrency(concurrency)...)
for _, s := range []string{"logs", "metrics", "traces"} {
// Disable pipelines not relevant to the benchmark by overriding receiver and exporter to nop
if signal != s {
configFiles = append(configFiles, DisableSignal(s)...)
}
}
return
}