internal/telemetrygen/traces/traces.go (139 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. // This file is forked from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/790e18f1733e71debc7608aed98ace654ac76a60/cmd/telemetrygen/internal/traces/traces.go, // which is licensed under Apache-2 and Copyright The OpenTelemetry Authors. // // This original implementation is modified: // - Start function now only creates a logger when it is not already configured in cfg // - Use the correct error in batch span processor error logging. // - Use WithBlocking instead of WithBatchTimeout in BatchSpanProcessor package traces import ( "context" "fmt" "math" "strings" "sync" "sync/atomic" "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.uber.org/zap" "golang.org/x/time/rate" "github.com/elastic/apm-perf/internal/telemetrygen/common" ) func Start(cfg *Config) error { logger := cfg.Logger if logger == nil { newLogger, err := common.CreateLogger(cfg.SkipSettingGRPCLogger) if err != nil { return err } logger = newLogger } var exp *otlptrace.Exporter if cfg.UseHTTP { var exporterOpts []otlptracehttp.Option logger.Info("starting HTTP exporter") exporterOpts, err := httpExporterOptions(cfg) if err != nil { return err } exp, err = otlptracehttp.New(context.Background(), exporterOpts...) if err != nil { return fmt.Errorf("failed to obtain OTLP HTTP exporter: %w", err) } } else { var exporterOpts []otlptracegrpc.Option logger.Info("starting gRPC exporter") exporterOpts, err := grpcExporterOptions(cfg) if err != nil { return err } exp, err = otlptracegrpc.New(context.Background(), exporterOpts...) if err != nil { return fmt.Errorf("failed to obtain OTLP gRPC exporter: %w", err) } } defer func() { logger.Info("stopping the exporter") if tempError := exp.Shutdown(context.Background()); tempError != nil { logger.Error("failed to stop the exporter", zap.Error(tempError)) } }() var ssp sdktrace.SpanProcessor if cfg.Batch { ssp = sdktrace.NewBatchSpanProcessor(exp, sdktrace.WithBlocking()) defer func() { logger.Info("stop the batch span processor") if tempError := ssp.Shutdown(context.Background()); tempError != nil { logger.Error("failed to stop the batch span processor", zap.Error(tempError)) } }() } var attributes []attribute.KeyValue // may be overridden by `--otlp-attributes service.name="foo"` attributes = append(attributes, semconv.ServiceNameKey.String(cfg.ServiceName)) attributes = append(attributes, cfg.GetAttributes()...) tracerProvider := sdktrace.NewTracerProvider( sdktrace.WithResource(resource.NewWithAttributes(semconv.SchemaURL, attributes...)), ) if cfg.Batch { tracerProvider.RegisterSpanProcessor(ssp) } otel.SetTracerProvider(tracerProvider) if err := Run(cfg, logger); err != nil { logger.Error("failed to execute the test scenario.", zap.Error(err)) return err } return nil } // Run executes the test scenario. func Run(c *Config, logger *zap.Logger) error { if c.TotalDuration > 0 { c.NumTraces = 0 } else if c.NumTraces <= 0 { return fmt.Errorf("either `traces` or `duration` must be greater than 0") } limit := rate.Limit(c.Rate) if c.Rate == 0 { limit = rate.Inf logger.Info("generation of traces isn't being throttled") } else { logger.Info("generation of traces is limited", zap.Float64("per-second", float64(limit))) } var statusCode codes.Code switch strings.ToLower(c.StatusCode) { case "0", "unset", "": statusCode = codes.Unset case "1", "error": statusCode = codes.Error case "2", "ok": statusCode = codes.Ok default: return fmt.Errorf("expected `status-code` to be one of (Unset, Error, Ok) or (0, 1, 2), got %q instead", c.StatusCode) } wg := sync.WaitGroup{} running := &atomic.Bool{} running.Store(true) telemetryAttributes := c.GetTelemetryAttributes() for i := 0; i < c.WorkerCount; i++ { wg.Add(1) w := worker{ numTraces: c.NumTraces, numChildSpans: int(math.Max(1, float64(c.NumChildSpans))), propagateContext: c.PropagateContext, statusCode: statusCode, limitPerSecond: limit, totalDuration: c.TotalDuration, running: running, wg: &wg, logger: logger.With(zap.Int("worker", i)), loadSize: c.LoadSize, spanDuration: c.SpanDuration, } go w.simulateTraces(telemetryAttributes) } if c.TotalDuration > 0 { time.Sleep(c.TotalDuration) running.Store(false) } wg.Wait() return nil }