loadgen/eventhandler/handler.go (331 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
// Package eventhandler contains code for replaying batch of agent data to APM
// server.
package eventhandler
import (
"bufio"
"bytes"
"context"
cryptorand "crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io/fs"
"math/bits"
"math/rand"
"strings"
"sync"
"time"
"github.com/elastic/apm-perf/pkg/supportedstacks"
"github.com/klauspost/compress/zlib"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"go.elastic.co/fastjson"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
var (
newlineBytes = []byte("\n")
// supportedTSFormats lists variations of RFC3339 for supporting
// different formats for the timezone offset. Copied from apm-data.
supportedTSFormats = []string{
"2006-01-02T15:04:05Z07:00", // RFC3339
"2006-01-02T15:04:05Z0700",
"2006-01-02T15:04:05Z07",
}
)
type batch struct {
metadata []byte
events []event
}
type event struct {
payload []byte
objectType string
timestamp time.Time
}
// Handler replays stored events to an APM Server.
//
// It is safe to make concurrent calls to Handler methods.
type Handler struct {
mu sync.Mutex // guards config.Rand
config Config
logger *zap.Logger
batches []batch
minTimestamp time.Time // across all batches
}
// Config holds configuration for Handler.
type Config struct {
// Path holds the path to a file within Storage, holding recorded
// event batches for replaying. Path may be a glob, in which case
// it may match more than one file.
//
// The file contents are expected to hold ND-JSON event streams in
// plain text. These may be transformed, and will be compressed
// (with zlib.BestSpeed to minimise overhead), during replay.
Path string
// Storage holds the fs.FS from which the file specified by Path
// will read, for extracting batches.
Storage fs.FS
// Limiter holds a rate.Limiter for controlling the event rate.
//
// If Limiter is nil, an infinite rate limiter will be used.
Limiter *rate.Limiter
// Transport holds the Transport that will be used for replaying
// event batches.
Transport Transport
// Rand, if non-nil, will be used for field randomization.
//
// If Rand is nil, then a new Rand will be created, seeded with
// a value read from crypto/rand. If Rand is supplied (non-nil),
// it must not be invoked concurrently by any other goroutines.
Rand *rand.Rand
// IgnoreErrors when set to true ignores HTTP errors while sending
// events using the event handler.
IgnoreErrors bool
// RunForever when set to true, will keep the handler running
// until a signal is received to stop it.
RunForever bool
// Writer writes replayable events to buffer.
Writer EventWriter
// RewriteTimestamps controls whether event timestamps are rewritten
// during replay.
//
// If this is false, then the original timestamps are unmodified.
//
// If this is true, then for each call to SendBatches the timestamps
// be adjusted as follows: first we calculate the smallest timestamp
// across all of the batches, and then compute the delta between an
// event's timestamp and the smallest timestamp; this is then added
// to the current time as recorded when SendBatches is invoked.
RewriteTimestamps bool
// RewriteIDs controls whether trace, transaction, span, and error
// event IDs are rewritten.
//
// If this is false, then the original IDs are unmodified.
//
// If this is true, then for each call to SendBatches the event IDs
// will be adjusted as follows: first, we generate a random 64-bit
// integer. Next, for each event ID whose value is entirely hex
// digits, we will decode each hex digit to a 4-bit value, XOR it
// with the next 4 bits in the randon number (rotating left), and
// then re-encode as a hex digit. Note that we intentionally use a
// single random value to ensure IDs are randomised consistently,
// such that event relationships are maintained.
RewriteIDs bool
// RewriteServiceNames controls the rewriting of `service.name`
// in events.
RewriteServiceNames bool
// RewriteServiceNodeNames controls the rewriting of
// `service.node.name` in events.
RewriteServiceNodeNames bool
// RewriteServiceTargetNames controls the rewriting of
// `service.target.name` in events.
RewriteServiceTargetNames bool
// RewriteSpanNames controls the rewriting of `span.name` in events.
RewriteSpanNames bool
// RewriteTransactionNames controls the rewriting of `transaction.name`
// in events.
RewriteTransactionNames bool
// RewriteTransactionTypes controls the rewriting of `transaction.type`
// in events.
RewriteTransactionTypes bool
// TargetStackVersion represents the version of the stack that event
// generation is targetting. This allows to tailor the geneator behavior
// if needed (es rewriting timestamps).
TargetStackVersion supportedstacks.TargetStackVersion
}
// New creates a new Handler with config.
func New(logger *zap.Logger, config Config, ec EventCollector) (*Handler, error) {
if config.Path == "" {
return nil, fmt.Errorf("eventhandler: path is required")
}
if config.Transport == nil {
return nil, errors.New("empty transport received")
}
if config.Limiter == nil {
config.Limiter = rate.NewLimiter(rate.Inf, 0)
}
if config.Rand == nil {
var rngseed int64
err := binary.Read(cryptorand.Reader, binary.LittleEndian, &rngseed)
if err != nil {
return nil, fmt.Errorf("failed to generate seed for math/rand: %w", err)
}
config.Rand = rand.New(rand.NewSource(rngseed))
}
if config.Writer == nil {
return nil, errors.New("empty writer received")
}
h := Handler{
logger: logger.Named("handler"),
config: config,
}
matches, err := fs.Glob(config.Storage, config.Path)
if err != nil {
return nil, err
}
logger.Debug("file matching pattern found for batch extraction", zap.String("matches", strings.Join(matches, ",")))
for _, path := range matches {
f, err := config.Storage.Open(path)
if err != nil {
return nil, err
}
s := bufio.NewScanner(f)
var current *batch
for s.Scan() {
line := s.Bytes()
if len(line) == 0 {
continue
}
// TODO(marclop): Suppport RUM headers and handle them differently.
if err := ec.Filter(line); err != nil {
return nil, fmt.Errorf("line filter failed: %w", err)
}
// Copy the line, as it will be overwritten by the next scan.
linecopy := make([]byte, len(line))
copy(linecopy, line)
// if the line is meta, create a new batch
if ec.IsMeta(line) {
h.batches = append(h.batches, batch{
metadata: linecopy,
})
current = &h.batches[len(h.batches)-1]
} else {
event := ec.Process(linecopy)
if !event.timestamp.IsZero() {
if h.minTimestamp.IsZero() || event.timestamp.Before(h.minTimestamp) {
h.minTimestamp = event.timestamp
}
}
current.events = append(current.events, event)
}
}
}
if len(h.batches) == 0 {
return nil, fmt.Errorf("eventhandler: found no elements to replay from %s", config.Path)
}
logger.Debug("collected batches", zap.Int("size", len(h.batches)))
return &h, nil
}
// SendBatchesInLoop will send events in loop, such that it can be used to warm up the remote APM Server,
// by sending events until the context is canceled.
func (h *Handler) SendBatchesInLoop(ctx context.Context) error {
// state is created here because the total number of events in h.batches can be smaller than the burst
// and it can lead the sendBatches to finish its cycle without sending the desired burst number of events.
// If we keep the state within sendBatches, it will wait for the interval t whenever starting the function as
// `s.sent` is set to 0 so it needs to know the previous run's state
s := state{
burst: h.config.Limiter.Burst(),
}
// Send events in batches and when getting an error restart from where
// we left.
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if _, err := h.sendBatches(ctx, &s); err != nil {
if h.config.RunForever {
h.logger.Error("failed to send batch of events", zap.Error(err))
continue
}
return err
}
// safeguard `s.sent` so that it doesn't exceed math.MaxInt
// but keep the remainder so the next batches know where to start
if s.burst > 0 {
s.sent = s.sent % s.burst
}
}
}
}
// SendBatches sends the loaded trace data to the configured transport,
// returning the total number of documents sent and any transport errors.
func (h *Handler) SendBatches(ctx context.Context) (int, error) {
return h.sendBatches(ctx, &state{
burst: h.config.Limiter.Burst(),
})
}
func (h *Handler) sendBatches(ctx context.Context, s *state) (int, error) {
h.mu.Lock()
randomBits := h.config.Rand.Uint64()
h.mu.Unlock()
h.logger.Debug("got random bits for batch")
baseTimestamp := time.Now().UTC()
h.logger.Debug("calculated base timestamp", zap.String("timestamp", baseTimestamp.String()))
for _, batch := range h.batches {
if err := h.sendBatch(ctx, s, batch, baseTimestamp, randomBits); err != nil {
return s.sent, fmt.Errorf("cannot send batch: %w", err)
}
}
return s.sent, nil
}
func (h *Handler) sendBatch(
ctx context.Context,
s *state,
b batch,
baseTimestamp time.Time,
randomBits uint64,
) error {
events := b.events
h.logger.Debug("sending events in burst", zap.Int("size", len(events)))
for len(events) > 0 {
n := len(events)
if s.burst > 0 {
mod := s.sent % s.burst
if mod == 0 {
// We're starting a new iteration, so wait to send a burst.
if err := h.config.Limiter.WaitN(ctx, s.burst); err != nil {
return fmt.Errorf("cannot wait for limiter to allow burst: %w", err)
}
}
// Send as many events of the batch as we can, up to the
// burst size minus however many events have been sent for
// this iteration.
capacity := s.burst - mod
h.logger.Debug("calculated capacity", zap.Int("value", capacity))
if n > capacity {
n = capacity
}
}
writer := newEventWriter()
if err := h.config.Writer(h.config, h.minTimestamp,
writer, batch{
metadata: b.metadata,
events: events[:n],
}, baseTimestamp, randomBits); err != nil {
return fmt.Errorf("cannot write events: %w", err)
}
if err := writer.Close(); err != nil {
return fmt.Errorf("cannot close writer: %w", err)
}
h.logger.Debug("wrote events to buffer",
zap.Int("bytes.uncompressed", writer.written),
zap.Int("bytes.compressed", writer.buf.Len()),
)
h.logger.Debug("closed writer")
// Do not reuse `writer`: in error cases, SendEvents may return while the request body is
// still being transmitted by the HTTP library. Reusing `writer` could cause a panic due to
// concurrent reads & writes.
if err := h.config.Transport.SendEvents(ctx, &writer.buf, h.config.IgnoreErrors); err != nil {
return fmt.Errorf("cannot send events through transport: %w", err)
}
h.logger.Debug("sent events through transport")
s.sent += n
events = events[n:]
}
return nil
}
func rewriteJSONObject(w *eventWriter, object gjson.Result, f func(key, value gjson.Result) bool) {
w.rewriteBuf.RawByte('{')
first := true
object.ForEach(func(key, value gjson.Result) bool {
if first {
first = false
} else {
w.rewriteBuf.RawByte(',')
}
w.rewriteBuf.RawString(key.Raw)
w.rewriteBuf.RawByte(':')
return f(key, value)
})
w.rewriteBuf.RawByte('}')
}
func randomizeTraceID(out *bytes.Buffer, in string, randomBits uint64) bool {
n := len(in)
for i := 0; i < n; i++ {
b := in[i]
if !((b >= '0' && b <= '9') ||
(b >= 'a' && b <= 'f') ||
(b >= 'A' && b <= 'F')) {
// Not all hex.
return false
}
}
out.Grow(n)
for i := 0; i < n; i++ {
b := in[i]
var h uint8
switch {
case b >= '0' && b <= '9':
h = b - '0'
case b >= 'a' && b <= 'f':
h = 10 + (b - 'a')
case b >= 'A' && b <= 'F':
h = 10 + (b - 'A')
}
h = (h ^ uint8(randomBits)) & 0x0f
randomBits = bits.RotateLeft64(randomBits, 4)
if h < 10 {
b = '0' + h
} else {
b = 'a' + (h - 10)
}
out.WriteByte(b)
}
return true
}
func randomizeASCIIField(data []byte, path string, randomBits uint64, scratch *bytes.Buffer) ([]byte, error) {
result := gjson.GetBytes(data, path)
if !result.Exists() {
return data, nil
}
randomizeASCII(scratch, result.Str, randomBits)
defer scratch.Reset()
data, err := sjson.SetBytes(data, path, scratch.String())
if err != nil {
return nil, fmt.Errorf("failed to rewrite %q: %w", path, err)
}
return data, nil
}
// randomizeASCII replaces ASCII letter and digit runes in the input
// with random ASCII runes in the same category.
func randomizeASCII(out *bytes.Buffer, in string, randomBits uint64) {
for _, r := range in {
// '0' > 'A' > 'a'
if r < '0' || r > 'z' {
out.WriteRune(r)
continue
}
// Use 5 bits, which is enough to cover either
// 26 ASCII letters or 10 ASCII digits.
i := (uint8(randomBits) & 0x1f)
randomBits = bits.RotateLeft64(randomBits, 5)
switch {
case r >= 'a':
r = rune('a' + i%26)
case r >= 'A' && r <= 'Z':
r = rune('A' + i%26)
case r <= '9':
r = rune('0' + i%10)
}
out.WriteRune(r)
}
}
type state struct {
burst int
sent int
}
type eventWriter struct {
rewriteBuf fastjson.Writer
idBuf bytes.Buffer
buf bytes.Buffer
*zlib.Writer
written int
}
func newEventWriter() *eventWriter {
var w eventWriter
// Preallocate to minimize memory copies.
w.buf.Grow(2 * 1024 * 1024)
zw, err := zlib.NewWriterLevel(&w.buf, zlib.BestSpeed)
if err != nil {
// Happens only when compression level doesn't exist
panic(err.Error())
}
w.Writer = zw
return &w
}
func (w *eventWriter) Write(p []byte) (n int, err error) {
w.written += len(p)
n, err = w.Writer.Write(p)
return
}