internal/telemetrygen/traces/worker.go (89 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
// This file is forked from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/790e18f1733e71debc7608aed98ace654ac76a60/cmd/telemetrygen/internal/traces/worker.go,
// which is licensed under Apache-2 and Copyright The OpenTelemetry Authors.
//
// This file does not contain functional modifications.
package traces
import (
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
type worker struct {
running *atomic.Bool // pointer to shared flag that indicates it's time to stop the test
numTraces int // how many traces the worker has to generate (only when duration==0)
numChildSpans int // how many child spans the worker has to generate per trace
propagateContext bool // whether the worker needs to propagate the trace context via HTTP headers
statusCode codes.Code // the status code set for the child and parent spans
totalDuration time.Duration // how long to run the test for (overrides `numTraces`)
limitPerSecond rate.Limit // how many spans per second to generate
wg *sync.WaitGroup // notify when done
loadSize int // desired minimum size in MB of string data for each generated trace
spanDuration time.Duration // duration of generated spans
logger *zap.Logger
}
const (
fakeIP string = "1.2.3.4"
charactersPerMB = 1024 * 1024 // One character takes up one byte of space, so this number comes from the number of bytes in a megabyte
)
func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
tracer := otel.Tracer("telemetrygen")
limiter := rate.NewLimiter(w.limitPerSecond, 1)
var i int
for w.running.Load() {
spanStart := time.Now()
spanEnd := spanStart.Add(w.spanDuration)
ctx, sp := tracer.Start(context.Background(), "lets-go", trace.WithAttributes(
semconv.NetPeerIPKey.String(fakeIP),
semconv.PeerServiceKey.String("telemetrygen-server"),
),
trace.WithSpanKind(trace.SpanKindClient),
trace.WithTimestamp(spanStart),
)
sp.SetAttributes(telemetryAttributes...)
for j := 0; j < w.loadSize; j++ {
sp.SetAttributes(attribute.String(fmt.Sprintf("load-%v", j), string(make([]byte, charactersPerMB))))
}
childCtx := ctx
if w.propagateContext {
header := propagation.HeaderCarrier{}
// simulates going remote
otel.GetTextMapPropagator().Inject(childCtx, header)
// simulates getting a request from a client
childCtx = otel.GetTextMapPropagator().Extract(childCtx, header)
}
var endTimestamp trace.SpanEventOption
for j := 0; j < w.numChildSpans; j++ {
_, child := tracer.Start(childCtx, "okey-dokey-"+strconv.Itoa(j), trace.WithAttributes(
semconv.NetPeerIPKey.String(fakeIP),
semconv.PeerServiceKey.String("telemetrygen-client"),
),
trace.WithSpanKind(trace.SpanKindServer),
trace.WithTimestamp(spanStart),
)
child.SetAttributes(telemetryAttributes...)
if err := limiter.Wait(context.Background()); err != nil {
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
}
endTimestamp = trace.WithTimestamp(spanEnd)
child.SetStatus(w.statusCode, "")
child.End(endTimestamp)
// Reset the start and end for next span
spanStart = spanEnd
spanEnd = spanStart.Add(w.spanDuration)
}
sp.SetStatus(w.statusCode, "")
sp.End(endTimestamp)
i++
if w.numTraces != 0 {
if i >= w.numTraces {
break
}
}
}
w.logger.Info("traces generated", zap.Int("traces", i))
w.wg.Done()
}