internal/telemetrygen/metrics/worker.go (90 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. // This file is forked from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/790e18f1733e71debc7608aed98ace654ac76a60/cmd/telemetrygen/internal/metrics/worker.go, // which is licensed under Apache-2 and Copyright The OpenTelemetry Authors. // // This file does not contain functional modifications. package metrics import ( "context" "sync" "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "go.uber.org/zap" "golang.org/x/time/rate" ) type worker struct { running *atomic.Bool // pointer to shared flag that indicates it's time to stop the test metricType metricType // type of metric to generate numMetrics int // how many metrics the worker has to generate (only when duration==0) totalDuration time.Duration // how long to run the test for (overrides `numMetrics`) limitPerSecond rate.Limit // how many metrics per second to generate wg *sync.WaitGroup // notify when done logger *zap.Logger // logger index int // worker index } func (w worker) simulateMetrics(res *resource.Resource, exporterFunc func() (sdkmetric.Exporter, error), signalAttrs []attribute.KeyValue) { limiter := rate.NewLimiter(w.limitPerSecond, 1) exporter, err := exporterFunc() if err != nil { w.logger.Error("failed to create the exporter", zap.Error(err)) return } defer func() { w.logger.Info("stopping the exporter") if tempError := exporter.Shutdown(context.Background()); tempError != nil { w.logger.Error("failed to stop the exporter", zap.Error(tempError)) } }() var i int64 for w.running.Load() { var metrics []metricdata.Metrics switch w.metricType { case metricTypeGauge: metrics = append(metrics, metricdata.Metrics{ Name: "gen", Data: metricdata.Gauge[int64]{ DataPoints: []metricdata.DataPoint[int64]{ { Time: time.Now(), Value: i, Attributes: attribute.NewSet(signalAttrs...), }, }, }, }) case metricTypeSum: metrics = append(metrics, metricdata.Metrics{ Name: "gen", Data: metricdata.Sum[int64]{ IsMonotonic: true, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[int64]{ { StartTime: time.Now().Add(-1 * time.Second), Time: time.Now(), Value: i, Attributes: attribute.NewSet(signalAttrs...), }, }, }, }) default: w.logger.Fatal("unknown metric type") } rm := metricdata.ResourceMetrics{ Resource: res, ScopeMetrics: []metricdata.ScopeMetrics{{Metrics: metrics}}, } if err := exporter.Export(context.Background(), &rm); err != nil { w.logger.Fatal("exporter failed", zap.Error(err)) } if err := limiter.Wait(context.Background()); err != nil { w.logger.Fatal("limiter wait failed, retry", zap.Error(err)) } i++ if w.numMetrics != 0 && i >= int64(w.numMetrics) { break } } w.logger.Info("metrics generated", zap.Int64("metrics", i)) w.wg.Done() }