util/common/traces/otlp/generator.go (119 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package otlp import ( "context" "errors" "time" "go.opentelemetry.io/contrib/propagators/aws/xray" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/maps" "github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/base" ) var generatorError = errors.New("Generator error") const ( serviceName = "load-generator" attributeKeyAwsXrayAnnotations = "aws.xray.annotations" ) type OtlpTracesGenerator struct { base.TraceGenerator base.TraceGeneratorInterface } func (g *OtlpTracesGenerator) StartSendingTraces(ctx context.Context) error { client, shutdown, err := setupClient(ctx) if err != nil { return err } defer shutdown(ctx) ticker := time.NewTicker(g.Cfg.Interval) for { select { case <-g.Done: ticker.Stop() return client.ForceFlush(ctx) case <-ticker.C: if err = g.Generate(ctx); err != nil { return err } } } } func (g *OtlpTracesGenerator) StopSendingTraces() { close(g.Done) } func NewLoadGenerator(cfg *base.TraceGeneratorConfig) *OtlpTracesGenerator { return &OtlpTracesGenerator{ TraceGenerator: base.TraceGenerator{ Cfg: cfg, Done: make(chan struct{}), SegmentsGenerationCount: 0, SegmentsEndedCount: 0, }, } } func (g *OtlpTracesGenerator) Generate(ctx context.Context) error { tracer := otel.Tracer("tracer") g.SegmentsGenerationCount++ _, span := tracer.Start(ctx, "example-span", trace.WithSpanKind(trace.SpanKindServer)) defer func() { span.End() g.SegmentsEndedCount++ }() if len(g.Cfg.Annotations) > 0 { span.SetAttributes(attribute.StringSlice(attributeKeyAwsXrayAnnotations, maps.Keys(g.Cfg.Annotations))) } span.SetAttributes(g.Cfg.Attributes...) return nil } func (g *OtlpTracesGenerator) GetSegmentCount() (int, int) { return g.SegmentsGenerationCount, g.SegmentsEndedCount } func (g *OtlpTracesGenerator) GetAgentConfigPath() string { return g.AgentConfigPath } func (g *OtlpTracesGenerator) GetAgentRuntime() time.Duration { return g.AgentRuntime } func (g *OtlpTracesGenerator) GetName() string { return g.Name } func (g *OtlpTracesGenerator) GetGeneratorConfig() *base.TraceGeneratorConfig { return g.Cfg } func setupClient(ctx context.Context) (*sdktrace.TracerProvider, func(context.Context) error, error) { res := resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceName(serviceName), ) tp, err := setupTraceProvider(ctx, res) if err != nil { return nil, nil, err } otel.SetTracerProvider(tp) otel.SetTextMapPropagator(xray.Propagator{}) return tp, func(context.Context) (err error) { timeoutCtx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() err = tp.Shutdown(timeoutCtx) if err != nil { return err } return nil }, nil } func setupTraceProvider(ctx context.Context, res *resource.Resource) (*sdktrace.TracerProvider, error) { exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure()) if err != nil { return nil, err } return sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.AlwaysSample()), sdktrace.WithBatcher(exporter), sdktrace.WithResource(res), sdktrace.WithIDGenerator(xray.NewIDGenerator()), ), nil }