loadgen/cmd/otelbench/flags.go (248 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"flag"
"fmt"
"net/url"
"os"
"sort"
"strconv"
"strings"
"time"
)
var Config struct {
ServerURLOTLP *url.URL
ServerURLOTLPHTTP *url.URL
SecretToken string
APIKey string
Insecure bool
InsecureSkipVerify bool
Headers map[string]string
CollectorConfigPath string
Logs bool
Metrics bool
Traces bool
ExporterOTLP bool
ExporterOTLPHTTP bool
ConcurrencyList []int
Telemetry TelemetryConfig
}
type TelemetryConfig struct {
ElasticsearchURL []string
ElasticsearchUserName string
ElasticsearchPassword string
ElasticsearchAPIKey string
ElasticsearchTimeout time.Duration
ElasticsearchIndex string
FilterClusterName string
FilterProjectID string
Metrics []string
}
var defaultTelemetryMetrics = []string{
"otelcol_process_cpu_seconds",
"otelcol_process_memory_rss",
"otelcol_process_runtime_total_alloc_bytes",
"otelcol_process_runtime_total_sys_memory_bytes",
"otelcol_process_uptime",
}
func Init() error {
// Server config
flag.Func(
"endpoint",
"target server endpoint for both otlp and otlphttp exporters (default to value in config yaml), equivalent to setting both -endpoint-otlp and -endpoint-otlphttp",
func(server string) (err error) {
if server != "" {
Config.ServerURLOTLP, err = url.Parse(server)
Config.ServerURLOTLPHTTP = Config.ServerURLOTLP
}
return
})
flag.Func(
"endpoint-otlp",
"target server endpoint for otlp exporter (default to value in config yaml)",
func(server string) (err error) {
if server != "" {
Config.ServerURLOTLP, err = url.Parse(server)
}
return
})
flag.Func(
"endpoint-otlphttp",
"target server endpoint for otlphttp exporter (default to value in config yaml)",
func(server string) (err error) {
if server != "" {
Config.ServerURLOTLPHTTP, err = url.Parse(server)
}
return
})
flag.StringVar(&Config.SecretToken, "secret-token", "", "secret token for target server")
flag.StringVar(&Config.APIKey, "api-key", "", "API key for target server")
flag.BoolVar(&Config.Insecure, "insecure", false, "disable TLS, ignored by otlphttp exporter (default to value in config yaml)")
flag.BoolVar(&Config.InsecureSkipVerify, "insecure-skip-verify", false, "skip validating the remote server TLS certificates (default to value in config yaml)")
flag.Func("header",
"extra headers in key=value format when sending data to the server. Can be repeated. e.g. -header X-FIRST-HEADER=foo -header X-SECOND-HEADER=bar",
func(s string) error {
k, v, ok := strings.Cut(s, "=")
if !ok {
return fmt.Errorf("invalid header '%s': format must be key=value", s)
}
if len(Config.Headers) == 0 {
Config.Headers = make(map[string]string)
}
Config.Headers[k] = v
return nil
},
)
flag.StringVar(&Config.CollectorConfigPath, "config", "config.yaml", "path to collector config yaml")
flag.BoolVar(&Config.ExporterOTLP, "exporter-otlp", true, "benchmark exporter otlp")
flag.BoolVar(&Config.ExporterOTLPHTTP, "exporter-otlphttp", true, "benchmark exporter otlphttp")
flag.BoolVar(&Config.Logs, "logs", true, "benchmark logs")
flag.BoolVar(&Config.Metrics, "metrics", true, "benchmark metrics")
flag.BoolVar(&Config.Traces, "traces", true, "benchmark traces")
// `concurrency` is similar to `agents` config in apmbench
// Each value passed into `concurrency` list will be used as loadgenreceiver `concurrency` config
Config.ConcurrencyList = []int{1} // default
flag.Func("concurrency", "comma-separated `list` of concurrency (number of simulated agents) to run each benchmark with",
func(input string) error {
var concurrencyList []int
for _, val := range strings.Split(input, ",") {
val = strings.TrimSpace(val)
if val == "" {
continue
}
n, err := strconv.Atoi(val)
if err != nil || n <= 0 {
return fmt.Errorf("invalid value %q for -concurrency", val)
}
concurrencyList = append(concurrencyList, n)
}
sort.Ints(concurrencyList)
Config.ConcurrencyList = concurrencyList
return nil
},
)
flag.Func("telemetry-elasticsearch-url", "optional comma-separated `list` of remote Elasticsearch telemetry hosts",
func(input string) error {
var urls []string
for _, val := range strings.Split(input, ",") {
val = strings.TrimSpace(val)
if val == "" {
continue
}
urls = append(urls, val)
}
Config.Telemetry.ElasticsearchURL = urls
return nil
},
)
flag.StringVar(&Config.Telemetry.ElasticsearchUserName, "telemetry-elasticsearch-username", "", "optional remote Elasticsearch telemetry username")
flag.StringVar(&Config.Telemetry.ElasticsearchPassword, "telemetry-elasticsearch-password", "", "optional remote Elasticsearch telemetry password")
flag.StringVar(&Config.Telemetry.ElasticsearchAPIKey, "telemetry-elasticsearch-api-key", "", "optional remote Elasticsearch telemetry API key")
flag.DurationVar(&Config.Telemetry.ElasticsearchTimeout, "telemetry-elasticsearch-timeout", time.Minute, "optional remote Elasticsearch telemetry request timeout")
flag.StringVar(&Config.Telemetry.ElasticsearchIndex, "telemetry-elasticsearch-index", "metrics-*", "optional remote Elasticsearch telemetry metrics index pattern")
flag.StringVar(&Config.Telemetry.FilterClusterName, "telemetry-filter-cluster-name", "", "optional remote Elasticsearch telemetry cluster name metrics filter")
flag.StringVar(&Config.Telemetry.FilterProjectID, "telemetry-filter-project-id", "", "optional remote Elasticsearch telemetry project id metrics filter")
flag.Func("telemetry-metrics", "optional comma-separated `list` of remote Elasticsearch telemetry metrics to be reported",
func(input string) error {
var m []string
for _, val := range strings.Split(input, ",") {
val = strings.TrimSpace(val)
if val == "" {
continue
}
m = append(m, val)
}
Config.Telemetry.Metrics = m
return nil
},
)
flag.Lookup("telemetry-metrics").DefValue = strings.Join(defaultTelemetryMetrics, ",")
// Set needs to be done separately since `DefValue` won't set default value for flag.Func.
if err := flag.Set("telemetry-metrics", strings.Join(defaultTelemetryMetrics, ",")); err != nil {
return fmt.Errorf(`error setting default flag "telemetry-metrics" value: %w`, err)
}
// For configs that can be set via environment variables, set the required
// flags from env if they are not explicitly provided via command line
return setFlagsFromEnv()
}
func getAuthorizationHeaderValue(apiKey, secretToken string) string {
if apiKey != "" {
return fmt.Sprintf("ApiKey %s", apiKey)
} else if secretToken != "" {
return fmt.Sprintf("Bearer %s", secretToken)
}
return ""
}
// setFlagsFromEnv sets flags from some Elastic APM env vars
func setFlagsFromEnv() error {
// value[0] is environment key
// value[1] is default value
flagEnvMap := map[string][]string{
"endpoint": {"ELASTIC_APM_SERVER_URL", ""},
"secret-token": {"ELASTIC_APM_SECRET_TOKEN", ""},
"api-key": {"ELASTIC_APM_API_KEY", ""},
"telemetry-elasticsearch-url": {"TELEMETRY_ELASTICSEARCH_URL", ""},
"telemetry-elasticsearch-username": {"TELEMETRY_ELASTICSEARCH_USERNAME", ""},
"telemetry-elasticsearch-password": {"TELEMETRY_ELASTICSEARCH_PASSWORD", ""},
"telemetry-elasticsearch-api-key": {"TELEMETRY_ELASTICSEARCH_API_KEY", ""},
"telemetry-elasticsearch-index": {"TELEMETRY_ELASTICSEARCH_INDEX", ""},
}
for k, v := range flagEnvMap {
envVarValue := getEnvOrDefault(v[0], v[1])
if err := flag.Set(k, envVarValue); err != nil {
return fmt.Errorf("error setting flag \"-%s\" from env var %q with value %q: %w", k, v[0], envVarValue, err)
}
}
return nil
}
func getEnvOrDefault(name, defaultValue string) string {
value := os.Getenv(name)
if value != "" {
return value
}
return defaultValue
}
// setsToConfigs converts --set to --config
func setsToConfigs(sets []string) (configFiles []string) {
for _, s := range sets {
idx := strings.Index(s, "=")
if idx == -1 {
panic("missing = in --set") // Should never happen as all the strings are hardcoded in this file
}
v := "yaml:" + strings.TrimSpace(strings.ReplaceAll(s[:idx], ".", "::")) + ": " + strings.TrimSpace(s[idx+1:])
configFiles = append(configFiles, v)
}
return
}
func ExporterConfigs(exporter string) (configFiles []string) {
var configSets []string
configSets = append(configSets, fmt.Sprintf("service.pipelines.logs.exporters=[%s]", exporter))
configSets = append(configSets, fmt.Sprintf("service.pipelines.metrics.exporters=[%s]", exporter))
configSets = append(configSets, fmt.Sprintf("service.pipelines.traces.exporters=[%s]", exporter))
if Config.ServerURLOTLP != nil {
configSets = append(configSets, fmt.Sprintf("exporters.otlp.endpoint=%s", Config.ServerURLOTLP))
}
if Config.ServerURLOTLPHTTP != nil {
configSets = append(configSets, fmt.Sprintf("exporters.otlphttp.endpoint=%s", Config.ServerURLOTLPHTTP))
}
if v := getAuthorizationHeaderValue(Config.APIKey, Config.SecretToken); v != "" {
configSets = append(configSets, fmt.Sprintf("exporters.%s.headers.Authorization=%s", exporter, v))
}
for k, v := range Config.Headers {
configSets = append(configSets, fmt.Sprintf("exporters.%s.headers.%s=%s", exporter, k, v))
}
// Only set insecure and insecure_skip_verify on true, so that corresponding config value in yaml is used on default.
if Config.Insecure {
configSets = append(configSets, fmt.Sprintf("exporters.%s.tls.insecure=%v", exporter, Config.Insecure))
}
if Config.InsecureSkipVerify {
configSets = append(configSets, fmt.Sprintf("exporters.%s.tls.insecure_skip_verify=%v", exporter, Config.InsecureSkipVerify))
}
return setsToConfigs(configSets)
}
func DisableSignal(signal string) (configFiles []string) {
return setsToConfigs([]string{
fmt.Sprintf("service.pipelines.%s.receivers=[nop]", signal),
fmt.Sprintf("service.pipelines.%s.exporters=[nop]", signal),
})
}
func SetIterations(iterations int) (configFiles []string) {
return setsToConfigs([]string{
fmt.Sprintf("receivers.loadgen.logs.max_replay=%d", iterations),
fmt.Sprintf("receivers.loadgen.metrics.max_replay=%d", iterations),
fmt.Sprintf("receivers.loadgen.traces.max_replay=%d", iterations),
})
}
func SetConcurrency(concurrency int) (configFiles []string) {
return setsToConfigs([]string{
fmt.Sprintf("receivers.loadgen.concurrency=%d", concurrency),
})
}