receiver/loadgenreceiver/metrics.go (174 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package loadgenreceiver // import "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver" import ( "bufio" "bytes" "context" _ "embed" "errors" "os" "sync" "sync/atomic" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/list" ) const counterAttr = "loadgenreceiver_counter" //go:embed testdata/metrics.jsonl var demoMetrics []byte type metricsGenerator struct { cfg *Config logger *zap.Logger samples *list.LoopingList[pmetric.Metrics] stats Stats statsMu sync.Mutex consumer consumer.Metrics cancelFn context.CancelFunc inflightConcurrency sync.WaitGroup // counter is an increasing counter tracking the number of times nextMetrics is called. // It is fine to overflow the Int64, as it is currently only used as a unique dimension in metrics. counter atomic.Int64 } func createMetricsReceiver( ctx context.Context, set receiver.Settings, config component.Config, consumer consumer.Metrics, ) (receiver.Metrics, error) { genConfig := config.(*Config) parser := pmetric.JSONUnmarshaler{} var err error sampleMetrics := demoMetrics if genConfig.Metrics.JsonlFile != "" { sampleMetrics, err = os.ReadFile(string(genConfig.Metrics.JsonlFile)) if err != nil { return nil, err } } var items []pmetric.Metrics scanner := bufio.NewScanner(bytes.NewReader(sampleMetrics)) scanner.Buffer(make([]byte, 0, maxScannerBufSize), maxScannerBufSize) for scanner.Scan() { metricBytes := scanner.Bytes() lineMetrics, err := parser.UnmarshalMetrics(metricBytes) if err != nil { return nil, err } items = append(items, lineMetrics) } if err := scanner.Err(); err != nil { return nil, err } return &metricsGenerator{ cfg: genConfig, logger: set.Logger, consumer: consumer, samples: list.NewLoopingList(items, genConfig.Metrics.MaxReplay), }, nil } func (ar *metricsGenerator) Start(ctx context.Context, _ component.Host) error { startCtx, cancelFn := context.WithCancel(ctx) ar.cancelFn = cancelFn for i := 0; i < ar.cfg.Concurrency; i++ { ar.inflightConcurrency.Add(1) go func() { defer ar.inflightConcurrency.Done() next := pmetric.NewMetrics() // per-worker temporary container to avoid allocs for { select { case <-startCtx.Done(): return default: } if next.IsReadOnly() { // As the optimization to reuse pdata is not compatible with fanoutconsumer, // i.e. in pipelines where there are more than 1 consumer, // as fanoutconsumer will mark the pdata struct as read only and cannot be reused. // See https://github.com/open-telemetry/opentelemetry-collector/blob/461a3558086a03ab13ea121d12e28e185a1c79b0/internal/fanoutconsumer/logs.go#L70 next = pmetric.NewMetrics() } err := ar.nextMetrics(next) if errors.Is(err, list.ErrLoopLimitReached) { return } // For graceful shutdown, use ctx instead of startCtx to shield Consume* from context canceled // In other words, Consume* will finish at its own pace, which may take indefinitely long. if err := ar.consumer.ConsumeMetrics(ctx, next); err != nil { ar.logger.Error(err.Error()) ar.statsMu.Lock() ar.stats.FailedRequests++ ar.stats.FailedMetricDataPoints += next.DataPointCount() ar.statsMu.Unlock() } else { ar.statsMu.Lock() ar.stats.Requests++ ar.stats.MetricDataPoints += next.DataPointCount() ar.statsMu.Unlock() } } }() } go func() { ar.inflightConcurrency.Wait() if ar.cfg.Metrics.doneCh != nil { ar.cfg.Metrics.doneCh <- ar.stats } }() return nil } func (ar *metricsGenerator) Shutdown(context.Context) error { if ar.cancelFn != nil { ar.cancelFn() } ar.inflightConcurrency.Wait() return nil } func (ar *metricsGenerator) nextMetrics(next pmetric.Metrics) error { now := pcommon.NewTimestampFromTime(time.Now()) sample, err := ar.samples.Next() if err != nil { return err } sample.CopyTo(next) counter := ar.counter.Add(1) rm := next.ResourceMetrics() for i := 0; i < rm.Len(); i++ { if ar.cfg.Metrics.AddCounterAttr { rm.At(i).Resource().Attributes().PutInt(counterAttr, counter) } for j := 0; j < rm.At(i).ScopeMetrics().Len(); j++ { for k := 0; k < rm.At(i).ScopeMetrics().At(j).Metrics().Len(); k++ { smetric := rm.At(i).ScopeMetrics().At(j).Metrics().At(k) switch smetric.Type() { case pmetric.MetricTypeGauge: dps := smetric.Gauge().DataPoints() for i := 0; i < dps.Len(); i++ { dps.At(i).SetTimestamp(now) dps.At(i).SetStartTimestamp(now) } case pmetric.MetricTypeSum: dps := smetric.Sum().DataPoints() for i := 0; i < dps.Len(); i++ { dps.At(i).SetTimestamp(now) dps.At(i).SetStartTimestamp(now) } case pmetric.MetricTypeHistogram: dps := smetric.Histogram().DataPoints() for i := 0; i < dps.Len(); i++ { dps.At(i).SetTimestamp(now) dps.At(i).SetStartTimestamp(now) } case pmetric.MetricTypeExponentialHistogram: dps := smetric.ExponentialHistogram().DataPoints() for i := 0; i < dps.Len(); i++ { dps.At(i).SetTimestamp(now) dps.At(i).SetStartTimestamp(now) } case pmetric.MetricTypeSummary: dps := smetric.Summary().DataPoints() for i := 0; i < dps.Len(); i++ { dps.At(i).SetTimestamp(now) dps.At(i).SetStartTimestamp(now) } case pmetric.MetricTypeEmpty: } } } } return nil }