pkg/telemetrygen/generator.go (69 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package telemetrygen
import (
"context"
cryptorand "crypto/rand"
"encoding/binary"
"fmt"
"math/rand"
"go.uber.org/zap"
"github.com/elastic/apm-perf/loadgen"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)
type Generator struct {
Config Config
Logger *zap.Logger
}
func New(cfg Config) (*Generator, error) {
if err := cfg.Validate(); err != nil {
return &Generator{}, fmt.Errorf("cannot create generator, configuration is invalid: %w", err)
}
return &Generator{Config: cfg, Logger: zap.NewNop()}, nil
}
func (g *Generator) RunBlocking(ctx context.Context) error {
limiter := loadgen.GetNewLimiter(g.Config.EventRate.Burst, g.Config.EventRate.Interval)
errg, gCtx := errgroup.WithContext(ctx)
var rngseed int64
err := binary.Read(cryptorand.Reader, binary.LittleEndian, &rngseed)
if err != nil {
return fmt.Errorf("failed to generate seed for math/rand: %w", err)
}
for i := 0; i < g.Config.AgentReplicas; i++ {
for _, expr := range []string{`apm-go*.ndjson`, `apm-nodejs*.ndjson`, `apm-python*.ndjson`, `apm-ruby*.ndjson`} {
expr := expr
errg.Go(func() error {
rng := rand.New(rand.NewSource(rngseed))
return runAgent(gCtx, g.Logger, expr, limiter, rng, g.Config)
})
}
}
return errg.Wait()
}
func runAgent(ctx context.Context, l *zap.Logger, expr string, limiter *rate.Limiter, rng *rand.Rand, cfg Config) error {
handler, err := loadgen.NewEventHandler(loadgen.EventHandlerParams{
Logger: l,
URL: cfg.ServerURL.String(),
Path: expr,
APIKey: cfg.APIKey,
Limiter: limiter,
Rand: rng,
RewriteIDs: cfg.RewriteIDs,
RewriteServiceNames: cfg.RewriteServiceNames,
RewriteServiceNodeNames: cfg.RewriteServiceNodeNames,
RewriteServiceTargetNames: cfg.RewriteServiceTargetNames,
RewriteSpanNames: cfg.RewriteSpanNames,
RewriteTransactionNames: cfg.RewriteTransactionNames,
RewriteTransactionTypes: cfg.RewriteTransactionTypes,
RewriteTimestamps: cfg.RewriteTimestamps,
Headers: cfg.Headers,
Protocol: "apm/http",
TargetStackVersion: cfg.TargetStackVersion,
})
if err != nil {
return fmt.Errorf("cannot create event handler: %w", err)
}
if _, err := handler.SendBatches(ctx); err != nil {
return fmt.Errorf("cannot send batches: %w", err)
}
return nil
}