internal/telemetrygen/logs/worker.go (59 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. // This file is forked from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/790e18f1733e71debc7608aed98ace654ac76a60/cmd/telemetrygen/internal/logs/worker.go, // which is licensed under Apache-2 and Copyright The OpenTelemetry Authors. // // This file does not contain functional modifications. package logs import ( "context" "sync" "sync/atomic" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/resource" "go.uber.org/zap" "golang.org/x/time/rate" ) type worker struct { running *atomic.Bool // pointer to shared flag that indicates it's time to stop the test numLogs int // how many logs the worker has to generate (only when duration==0) body string // the body of the log totalDuration time.Duration // how long to run the test for (overrides `numLogs`) limitPerSecond rate.Limit // how many logs per second to generate wg *sync.WaitGroup // notify when done logger *zap.Logger // logger index int // worker index } func (w worker) simulateLogs(res *resource.Resource, exporter exporter, telemetryAttributes []attribute.KeyValue) { limiter := rate.NewLimiter(w.limitPerSecond, 1) var i int64 for w.running.Load() { logs := plog.NewLogs() nRes := logs.ResourceLogs().AppendEmpty().Resource() attrs := res.Attributes() for _, attr := range attrs { nRes.Attributes().PutStr(string(attr.Key), attr.Value.AsString()) } log := logs.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() log.Body().SetStr(w.body) log.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) log.SetDroppedAttributesCount(1) log.SetSeverityNumber(plog.SeverityNumberInfo) log.SetSeverityText("Info") log.Attributes() lattrs := log.Attributes() lattrs.PutStr("app", "server") for i, key := range telemetryAttributes { lattrs.PutStr(key.Value.AsString(), telemetryAttributes[i].Value.AsString()) } if err := exporter.export(logs); err != nil { w.logger.Fatal("exporter failed", zap.Error(err)) } if err := limiter.Wait(context.Background()); err != nil { w.logger.Fatal("limiter wait failed, retry", zap.Error(err)) } i++ if w.numLogs != 0 && i >= int64(w.numLogs) { break } } w.logger.Info("logs generated", zap.Int64("logs", i)) w.wg.Done() }