internal/soaktest/runner.go (206 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
// Package soaktest contains code for defining soak testing scenarios and providing
// an interface for running them.
package soaktest
import (
"context"
cryptorand "crypto/rand"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"net/url"
"os"
"strings"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v3"
"github.com/elastic/apm-perf/loadgen"
"github.com/elastic/apm-perf/pkg/supportedstacks"
)
type RunnerConfig struct {
Scenario string
ScenariosPath string
ServerURL string
SecretToken string
APIKeys map[string]string
Headers map[string]string
BypassProxy bool
IgnoreErrors bool
// RunForever when set to true, will keep the handler running
// until a signal is received to stop it.
RunForever bool
RunDuration time.Duration
}
// redact redacts the passed string, keeping only the first 4
// chars and appending -REDACTED to it.
// It handles empty string by returning a fixed value: EMPTY.
// If the string has less than 4 characters returns EMPTY; as
// this function is expected to be run on secrets, we don't
// expect any string smaller than 4 characters.
func redact(v string) string {
redacted := "EMPTY"
if v != "" && len(v) >= 4 {
redacted = v[:4] + "-REDACTED"
}
return redacted
}
func (r RunnerConfig) MarshalLogObject(e zapcore.ObjectEncoder) (_ error) {
e.AddString("scenario", r.Scenario)
e.AddString("scenarios_path", r.ScenariosPath)
e.AddString("server_url", r.ServerURL)
e.AddString("secret_token", redact(r.SecretToken))
for k, v := range r.APIKeys {
// add API keys but redact full value. This will retain project information and
// provide a hint of the key value.
zap.String(fmt.Sprintf("apikey[%s]", k), redact(v)).AddTo(e)
}
for k, v := range r.Headers {
zap.String(fmt.Sprintf("headers[%s]", k), v).AddTo(e)
}
e.AddBool("bypass_proxy", r.BypassProxy)
e.AddBool("ignore_errors", r.IgnoreErrors)
e.AddBool("run_forever", r.RunForever)
return nil
}
type Runner struct {
config *RunnerConfig
logger *zap.Logger
scenarioConfigs []ScenarioConfig
scenarioName string
}
// NewRunner returns Runner to executes soak test scenario
func NewRunner(config *RunnerConfig, logger *zap.Logger) (*Runner, error) {
f, err := os.ReadFile(config.ScenariosPath)
if err != nil {
return nil, err
}
var y Scenarios
if err := yaml.Unmarshal(f, &y); err != nil {
return nil, err
}
scenarioConfigs := y.Scenarios[config.Scenario]
if scenarioConfigs == nil {
return nil, errors.New("unknown scenario " + config.Scenario)
}
for _, v := range scenarioConfigs {
// If no version preference is expressed set it to latest
// to preserve backward compatibility.
if v.TargetVersion == "" {
v.TargetVersion = "latest"
}
}
return &Runner{
config: config,
scenarioConfigs: scenarioConfigs,
logger: logger.Named("soaktest"),
scenarioName: config.Scenario,
}, nil
}
func (runner *Runner) Run(ctx context.Context) error {
if runner.config.RunDuration != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
go func() {
select {
case <-ctx.Done():
case <-time.After(runner.config.RunDuration):
}
cancel()
}()
}
g, gCtx := errgroup.WithContext(ctx)
// Create a Rand with the same seed for each agent, so we randomise their IDs consistently.
var rngseed int64
err := binary.Read(cryptorand.Reader, binary.LittleEndian, &rngseed)
if err != nil {
return fmt.Errorf("failed to generate seed for math/rand: %w", err)
}
runner.logger.Info("running scenario `" + runner.scenarioName + "`...")
for _, config := range runner.scenarioConfigs {
config := config
// when not specified, default to 1
if config.AgentReplicas <= 0 {
config.AgentReplicas = 1
}
for i := 0; i < config.AgentReplicas; i++ {
runner.logger.Debug(fmt.Sprintf("agent: %s, replica %d, event-rate: %s", config.AgentName, i, config.EventRate))
g.Go(func() error {
rng := rand.New(rand.NewSource(rngseed))
return runAgent(gCtx, runner, config, rng)
})
}
}
return g.Wait()
}
func runAgent(ctx context.Context, runner *Runner, config ScenarioConfig, rng *rand.Rand) error {
params, err := getHandlerParams(runner.config, config)
if err != nil {
return err
}
params.Rand = rng
runner.logger.Debug("computed load generation parameters", zap.Object("params", params))
params.Logger = runner.logger
handler, err := loadgen.NewEventHandler(params)
if err != nil {
return err
}
runner.logger.Debug("created event handler")
return handler.SendBatchesInLoop(ctx)
}
func getHandlerParams(runnerConfig *RunnerConfig, config ScenarioConfig) (loadgen.EventHandlerParams, error) {
// if AgentName is not specified, using all the APM agents,
// but shares the allowed events numbers sent for given duration(e.g. 4 agents send 10000/s in total)
if config.AgentName == "" {
config.AgentName = "apm-"
}
path := config.AgentName + "*.ndjson"
var params loadgen.EventHandlerParams
headers := make(map[string]string)
for k, v := range config.Headers {
headers[k] = v
}
for k, v := range runnerConfig.Headers {
headers[k] = v
}
for k, v := range headers {
headers[k] = strings.Replace(v, "<project_id>", config.ProjectID, 1)
}
headers["X-Elastic-Project-Id"] = config.ProjectID
if config.ServerURL == "" {
config.ServerURL = runnerConfig.ServerURL
}
// if <project_id> is specified in the url, replace it
serverURL, err := url.Parse(strings.Replace(config.ServerURL, "<project_id>", config.ProjectID, 1))
if err != nil {
return params, err
}
if config.APIKey == "" {
config.APIKey = runnerConfig.APIKeys[config.ProjectID]
}
burst, interval, err := loadgen.ParseEventRate(config.EventRate)
if err != nil {
return params, err
}
protocol := "apm/http"
if strings.HasPrefix(config.AgentName, "otlp-") {
protocol = "otlp/http"
}
datatype := "any"
if protocol == "otlp/http" {
if strings.HasPrefix(config.AgentName, "otlp-logs") {
datatype = "logs"
} else if strings.HasPrefix(config.AgentName, "otlp-metrics") {
datatype = "metrics"
} else if strings.HasPrefix(config.AgentName, "otlp-traces") {
datatype = "traces"
}
}
params = loadgen.EventHandlerParams{
Path: path,
URL: serverURL.String(),
APIKey: config.APIKey,
Token: runnerConfig.SecretToken,
IgnoreErrors: runnerConfig.IgnoreErrors,
RunForever: runnerConfig.RunForever,
Limiter: loadgen.GetNewLimiter(burst, interval),
RewriteIDs: config.RewriteIDs,
RewriteServiceNames: config.RewriteServiceNames,
RewriteServiceNodeNames: config.RewriteServiceNodeNames,
RewriteServiceTargetNames: config.RewriteServiceTargetNames,
RewriteSpanNames: config.RewriteSpanNames,
RewriteTransactionNames: config.RewriteTransactionNames,
RewriteTransactionTypes: config.RewriteTransactionTypes,
RewriteTimestamps: config.RewriteTimestamps,
Headers: headers,
Protocol: protocol,
Datatype: datatype,
TargetStackVersion: targetStackVer(config),
}
return params, nil
}
func targetStackVer(config ScenarioConfig) supportedstacks.TargetStackVersion {
v, _ := supportedstacks.FromStringVersion(config.TargetVersion)
return v
}