loadgen/eventhandler/apm-writer.go (138 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package eventhandler
import (
"fmt"
"time"
"github.com/elastic/apm-perf/pkg/supportedstacks"
"github.com/tidwall/gjson"
)
// writeAPMEvents writes to buffers in eventWriter JSON formatted events that can be replayed.
// Implements EventWriter interface.
func writeAPMEvents(config Config, minTimestamp time.Time, w *eventWriter, b batch, baseTimestamp time.Time, randomBits uint64) error {
rewriteAny := config.RewriteTimestamps ||
config.RewriteIDs ||
config.RewriteServiceNames ||
config.RewriteServiceNodeNames ||
config.RewriteServiceTargetNames ||
config.RewriteSpanNames ||
config.RewriteTransactionNames ||
config.RewriteTransactionTypes
var err error
metadata := b.metadata
if config.RewriteServiceNames {
metadata, err = randomizeASCIIField(metadata, "metadata.service.name", randomBits, &w.idBuf)
if err != nil {
return fmt.Errorf("failed to rewrite `service.name`: %w", err)
}
}
if config.RewriteServiceNodeNames {
// The intakev2 field name is `service.node.configured_name`,
// this is translated to `service.node.name` in the ES documents.
metadata, err = randomizeASCIIField(metadata, "metadata.service.node.configured_name", randomBits, &w.idBuf)
if err != nil {
return fmt.Errorf("failed to rewrite `service.node.name`: %w", err)
}
}
w.Write(metadata)
w.Write(newlineBytes)
for _, event := range b.events {
if !rewriteAny {
w.Write(event.payload)
w.Write(newlineBytes)
continue
}
w.rewriteBuf.RawByte('{')
w.rewriteBuf.String(event.objectType)
w.rewriteBuf.RawString(":")
rewriteJSONObject(w, gjson.GetBytes(event.payload, event.objectType), func(key, value gjson.Result) bool {
switch key.Str {
case "timestamp":
if config.RewriteTimestamps && !event.timestamp.IsZero() {
switch config.TargetStackVersion {
case supportedstacks.TargetStackVersionUnknown:
// if uknown assume latest to keep backward compatibility
rewriteTimestampLatest(event, minTimestamp, baseTimestamp, w)
case supportedstacks.TargetStackVersion7x:
rewriteTimestamp7x(event, minTimestamp, baseTimestamp, w)
case supportedstacks.TargetStackVersionLatest:
rewriteTimestampLatest(event, minTimestamp, baseTimestamp, w)
}
} else {
w.rewriteBuf.RawString(value.Raw)
}
case "id", "parent_id", "trace_id", "transaction_id":
if config.RewriteIDs && randomizeTraceID(&w.idBuf, value.Str, randomBits) {
w.rewriteBuf.RawByte('"')
w.rewriteBuf.RawBytes(w.idBuf.Bytes())
w.rewriteBuf.RawByte('"')
w.idBuf.Reset()
} else {
w.rewriteBuf.RawString(value.Raw)
}
case "name":
randomizeASCII(&w.idBuf, value.Str, randomBits)
switch {
case config.RewriteSpanNames && event.objectType == "span":
w.rewriteBuf.String(w.idBuf.String())
case config.RewriteTransactionNames && event.objectType == "transaction":
w.rewriteBuf.String(w.idBuf.String())
default:
w.rewriteBuf.RawString(value.Raw)
}
w.idBuf.Reset()
case "type":
switch {
case config.RewriteTransactionTypes && event.objectType == "transaction":
randomizeASCII(&w.idBuf, value.Str, randomBits)
w.rewriteBuf.String(w.idBuf.String())
w.idBuf.Reset()
default:
w.rewriteBuf.RawString(value.Raw)
}
case "context":
if !config.RewriteServiceTargetNames {
w.rewriteBuf.RawString(value.Raw)
break
}
rewriteJSONObject(w, value, func(key, value gjson.Result) bool {
if key.Str != "service" {
w.rewriteBuf.RawString(value.Raw)
return true
}
rewriteJSONObject(w, value, func(key, value gjson.Result) bool {
if key.Str != "target" {
w.rewriteBuf.RawString(value.Raw)
return true
}
rewriteJSONObject(w, value, func(key, value gjson.Result) bool {
if key.Str != "name" {
w.rewriteBuf.RawString(value.Raw)
return true
}
randomizeASCII(&w.idBuf, value.Str, randomBits)
w.rewriteBuf.String(w.idBuf.String())
w.idBuf.Reset()
return true
})
return true
})
return true
})
default:
w.rewriteBuf.RawString(value.Raw)
}
return true
})
w.rewriteBuf.RawString("}")
w.Write(w.rewriteBuf.Bytes())
w.Write(newlineBytes)
w.rewriteBuf.Reset()
}
return nil
}
func rewriteTimestamp7x(event event, minTimestamp time.Time, baseTimestamp time.Time, w *eventWriter) {
// 7.x does not support receiving string based timestamps
offset := event.timestamp.Sub(minTimestamp)
timestamp := baseTimestamp.Add(offset)
// The timestamp is in microseconds:
// https://github.com/elastic/apm-server/blob/7.17/model/modeldecoder/v2/model.go#L611
w.rewriteBuf.Uint64(uint64(timestamp.UnixMicro()))
}
func rewriteTimestampLatest(event event, minTimestamp time.Time, baseTimestamp time.Time, w *eventWriter) {
// We always encode rewritten timestamps as strings,
// so we don't lose any precision when offsetting by
// either the base timestamp, or the minimum timestamp
// across all the batches; string-formatted timestamps
// may have nanosecond precision.
offset := event.timestamp.Sub(minTimestamp)
timestamp := baseTimestamp.Add(offset)
w.rewriteBuf.RawByte('"')
w.rewriteBuf.Time(timestamp, time.RFC3339Nano)
w.rewriteBuf.RawByte('"')
}