connector/spanmetricsconnector/internal/metrics/metrics.go (270 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector/internal/metrics" import ( "sort" "github.com/lightstep/go-expohisto/structure" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" ) type Key string type HistogramMetrics interface { GetOrCreate(key Key, attributes pcommon.Map) Histogram BuildMetrics(pmetric.Metric, generateStartTimestamp, pcommon.Timestamp, pmetric.AggregationTemporality) ClearExemplars() } type Histogram interface { Observe(value float64) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value float64) } type explicitHistogramMetrics struct { metrics map[Key]*explicitHistogram bounds []float64 maxExemplarCount *int } type exponentialHistogramMetrics struct { metrics map[Key]*exponentialHistogram maxSize int32 maxExemplarCount *int } type explicitHistogram struct { attributes pcommon.Map exemplars pmetric.ExemplarSlice bucketCounts []uint64 count uint64 sum float64 bounds []float64 maxExemplarCount *int } type exponentialHistogram struct { attributes pcommon.Map exemplars pmetric.ExemplarSlice histogram *structure.Histogram[float64] maxExemplarCount *int } type generateStartTimestamp = func(Key) pcommon.Timestamp func NewExponentialHistogramMetrics(maxSize int32, maxExemplarCount *int) HistogramMetrics { return &exponentialHistogramMetrics{ metrics: make(map[Key]*exponentialHistogram), maxSize: maxSize, maxExemplarCount: maxExemplarCount, } } func NewExplicitHistogramMetrics(bounds []float64, maxExemplarCount *int) HistogramMetrics { return &explicitHistogramMetrics{ metrics: make(map[Key]*explicitHistogram), bounds: bounds, maxExemplarCount: maxExemplarCount, } } func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram { h, ok := m.metrics[key] if !ok { h = &explicitHistogram{ attributes: attributes, exemplars: pmetric.NewExemplarSlice(), bounds: m.bounds, bucketCounts: make([]uint64, len(m.bounds)+1), maxExemplarCount: m.maxExemplarCount, } m.metrics[key] = h } return h } func (m *explicitHistogramMetrics) BuildMetrics( metric pmetric.Metric, startTimestamp generateStartTimestamp, timestamp pcommon.Timestamp, temporality pmetric.AggregationTemporality, ) { metric.SetEmptyHistogram().SetAggregationTemporality(temporality) dps := metric.Histogram().DataPoints() dps.EnsureCapacity(len(m.metrics)) for k, h := range m.metrics { dp := dps.AppendEmpty() dp.SetStartTimestamp(startTimestamp(k)) dp.SetTimestamp(timestamp) dp.ExplicitBounds().FromRaw(h.bounds) dp.BucketCounts().FromRaw(h.bucketCounts) dp.SetCount(h.count) dp.SetSum(h.sum) for i := 0; i < h.exemplars.Len(); i++ { h.exemplars.At(i).SetTimestamp(timestamp) } h.exemplars.CopyTo(dp.Exemplars()) h.attributes.CopyTo(dp.Attributes()) } } func (m *explicitHistogramMetrics) ClearExemplars() { for _, h := range m.metrics { h.exemplars = pmetric.NewExemplarSlice() } } func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram { h, ok := m.metrics[key] if !ok { histogram := new(structure.Histogram[float64]) cfg := structure.NewConfig( structure.WithMaxSize(m.maxSize), ) histogram.Init(cfg) h = &exponentialHistogram{ histogram: histogram, attributes: attributes, exemplars: pmetric.NewExemplarSlice(), maxExemplarCount: m.maxExemplarCount, } m.metrics[key] = h } return h } func (m *exponentialHistogramMetrics) BuildMetrics( metric pmetric.Metric, startTimestamp generateStartTimestamp, timestamp pcommon.Timestamp, temporality pmetric.AggregationTemporality, ) { metric.SetEmptyExponentialHistogram().SetAggregationTemporality(temporality) dps := metric.ExponentialHistogram().DataPoints() dps.EnsureCapacity(len(m.metrics)) for k, m := range m.metrics { dp := dps.AppendEmpty() dp.SetStartTimestamp(startTimestamp(k)) dp.SetTimestamp(timestamp) expoHistToExponentialDataPoint(m.histogram, dp) for i := 0; i < m.exemplars.Len(); i++ { m.exemplars.At(i).SetTimestamp(timestamp) } m.exemplars.CopyTo(dp.Exemplars()) m.attributes.CopyTo(dp.Attributes()) } } // expoHistToExponentialDataPoint copies `lightstep/go-expohisto` structure.Histogram to // pmetric.ExponentialHistogramDataPoint func expoHistToExponentialDataPoint(agg *structure.Histogram[float64], dp pmetric.ExponentialHistogramDataPoint) { dp.SetCount(agg.Count()) dp.SetSum(agg.Sum()) if agg.Count() != 0 { dp.SetMin(agg.Min()) dp.SetMax(agg.Max()) } dp.SetZeroCount(agg.ZeroCount()) dp.SetScale(agg.Scale()) for _, half := range []struct { inFunc func() *structure.Buckets outFunc func() pmetric.ExponentialHistogramDataPointBuckets }{ {agg.Positive, dp.Positive}, {agg.Negative, dp.Negative}, } { in := half.inFunc() out := half.outFunc() out.SetOffset(in.Offset()) out.BucketCounts().EnsureCapacity(int(in.Len())) for i := uint32(0); i < in.Len(); i++ { out.BucketCounts().Append(in.At(i)) } } } func (m *exponentialHistogramMetrics) ClearExemplars() { for _, m := range m.metrics { m.exemplars = pmetric.NewExemplarSlice() } } func (h *explicitHistogram) Observe(value float64) { h.sum += value h.count++ // Binary search to find the value bucket index. index := sort.SearchFloat64s(h.bounds, value) h.bucketCounts[index]++ } func (h *explicitHistogram) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value float64) { if h.maxExemplarCount != nil && h.exemplars.Len() >= *h.maxExemplarCount { return } e := h.exemplars.AppendEmpty() e.SetTraceID(traceID) e.SetSpanID(spanID) e.SetDoubleValue(value) } func (h *exponentialHistogram) Observe(value float64) { h.histogram.Update(value) } func (h *exponentialHistogram) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value float64) { if h.maxExemplarCount != nil && h.exemplars.Len() >= *h.maxExemplarCount { return } e := h.exemplars.AppendEmpty() e.SetTraceID(traceID) e.SetSpanID(spanID) e.SetDoubleValue(value) } type Sum struct { attributes pcommon.Map count uint64 exemplars pmetric.ExemplarSlice maxExemplarCount *int // isFirst is used to track if this datapoint is new to the Sum. This // is used to ensure that new Sum metrics being with 0, and then are incremented // to the desired value. This avoids Prometheus throwing away the first // value in the series, due to the transition from null -> x. isFirst bool } func (s *Sum) Add(value uint64) { s.count += value } func NewSumMetrics(maxExemplarCount *int) SumMetrics { return SumMetrics{ metrics: make(map[Key]*Sum), maxExemplarCount: maxExemplarCount, } } type SumMetrics struct { metrics map[Key]*Sum maxExemplarCount *int } func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map) *Sum { s, ok := m.metrics[key] if !ok { s = &Sum{ attributes: attributes, exemplars: pmetric.NewExemplarSlice(), maxExemplarCount: m.maxExemplarCount, isFirst: true, } m.metrics[key] = s } return s } func (s *Sum) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value float64) { if s.maxExemplarCount != nil && s.exemplars.Len() >= *s.maxExemplarCount { return } e := s.exemplars.AppendEmpty() e.SetTraceID(traceID) e.SetSpanID(spanID) e.SetDoubleValue(value) } func (m *SumMetrics) BuildMetrics( metric pmetric.Metric, startTimestamp generateStartTimestamp, timestamp pcommon.Timestamp, temporality pmetric.AggregationTemporality, ) { metric.SetEmptySum().SetIsMonotonic(true) metric.Sum().SetAggregationTemporality(temporality) dps := metric.Sum().DataPoints() dps.EnsureCapacity(len(m.metrics)) for k, s := range m.metrics { dp := dps.AppendEmpty() dp.SetStartTimestamp(startTimestamp(k)) dp.SetTimestamp(timestamp) if s.isFirst { dp.SetIntValue(0) s.isFirst = false } else { dp.SetIntValue(int64(s.count)) } for i := 0; i < s.exemplars.Len(); i++ { s.exemplars.At(i).SetTimestamp(timestamp) } s.exemplars.CopyTo(dp.Exemplars()) s.attributes.CopyTo(dp.Attributes()) } } func (m *SumMetrics) ClearExemplars() { for _, sum := range m.metrics { sum.exemplars = pmetric.NewExemplarSlice() } }