receiver/statsdreceiver/internal/parser/metric_translator.go (145 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package parser // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/parser" import ( "sort" "time" "github.com/lightstep/go-expohisto/structure" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "gonum.org/v1/gonum/stat" ) var statsDDefaultPercentiles = []float64{0, 10, 50, 90, 95, 100} func buildCounterMetric(parsedMetric statsDMetric, isMonotonicCounter bool) pmetric.ScopeMetrics { ilm := pmetric.NewScopeMetrics() nm := ilm.Metrics().AppendEmpty() nm.SetName(parsedMetric.description.name) if parsedMetric.unit != "" { nm.SetUnit(parsedMetric.unit) } nm.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) nm.Sum().SetIsMonotonic(isMonotonicCounter) dp := nm.Sum().DataPoints().AppendEmpty() dp.SetIntValue(parsedMetric.counterValue()) for i := parsedMetric.description.attrs.Iter(); i.Next(); { dp.Attributes().PutStr(string(i.Attribute().Key), i.Attribute().Value.AsString()) } if parsedMetric.timestamp != 0 { dp.SetTimestamp(pcommon.Timestamp(parsedMetric.timestamp)) } return ilm } func setTimestampsForCounterMetric(ilm pmetric.ScopeMetrics, startTime, timeNow time.Time) { dp := ilm.Metrics().At(0).Sum().DataPoints().At(0) dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) if dp.Timestamp() == 0 { dp.SetTimestamp(pcommon.NewTimestampFromTime(timeNow)) } } func buildGaugeMetric(parsedMetric statsDMetric, timeNow time.Time) pmetric.ScopeMetrics { ilm := pmetric.NewScopeMetrics() nm := ilm.Metrics().AppendEmpty() nm.SetName(parsedMetric.description.name) if parsedMetric.unit != "" { nm.SetUnit(parsedMetric.unit) } dp := nm.SetEmptyGauge().DataPoints().AppendEmpty() dp.SetDoubleValue(parsedMetric.gaugeValue()) dp.SetTimestamp(pcommon.NewTimestampFromTime(timeNow)) for i := parsedMetric.description.attrs.Iter(); i.Next(); { dp.Attributes().PutStr(string(i.Attribute().Key), i.Attribute().Value.AsString()) } return ilm } func buildSummaryMetric(desc statsDMetricDescription, summary summaryMetric, startTime, timeNow time.Time, percentiles []float64, ilm pmetric.ScopeMetrics) { nm := ilm.Metrics().AppendEmpty() nm.SetName(desc.name) dp := nm.SetEmptySummary().DataPoints().AppendEmpty() count := float64(0) sum := float64(0) for i := range summary.points { c := summary.weights[i] count += c sum += summary.points[i] * c } // Note: count is rounded here, see note in counterValue(). dp.SetCount(uint64(count)) dp.SetSum(sum) dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) dp.SetTimestamp(pcommon.NewTimestampFromTime(timeNow)) for i := desc.attrs.Iter(); i.Next(); { dp.Attributes().PutStr(string(i.Attribute().Key), i.Attribute().Value.AsString()) } sort.Sort(dualSorter{summary.points, summary.weights}) for _, pct := range percentiles { eachQuantile := dp.QuantileValues().AppendEmpty() eachQuantile.SetQuantile(pct / 100) eachQuantile.SetValue(stat.Quantile(pct/100, stat.Empirical, summary.points, summary.weights)) } } func buildHistogramMetric(desc statsDMetricDescription, histogram histogramMetric, startTime, timeNow time.Time, ilm pmetric.ScopeMetrics) { nm := ilm.Metrics().AppendEmpty() nm.SetName(desc.name) expo := nm.SetEmptyExponentialHistogram() expo.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) dp := expo.DataPoints().AppendEmpty() agg := histogram.agg dp.SetCount(agg.Count()) dp.SetSum(agg.Sum()) if agg.Count() != 0 { dp.SetMin(agg.Min()) dp.SetMax(agg.Max()) } dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) dp.SetTimestamp(pcommon.NewTimestampFromTime(timeNow)) for i := desc.attrs.Iter(); i.Next(); { dp.Attributes().PutStr(string(i.Attribute().Key), i.Attribute().Value.AsString()) } dp.SetZeroCount(agg.ZeroCount()) dp.SetScale(agg.Scale()) for _, half := range []struct { inFunc func() *structure.Buckets outFunc func() pmetric.ExponentialHistogramDataPointBuckets }{ {agg.Positive, dp.Positive}, {agg.Negative, dp.Negative}, } { in := half.inFunc() out := half.outFunc() out.SetOffset(in.Offset()) out.BucketCounts().EnsureCapacity(int(in.Len())) for i := uint32(0); i < in.Len(); i++ { out.BucketCounts().Append(in.At(i)) } } } func (s statsDMetric) counterValue() int64 { x := s.asFloat // Note statds counters are always represented as integers. // There is no statsd specification that says what should or // shouldn't be done here. Rounding may occur for sample // rates that are not integer reciprocals. Recommendation: // use integer reciprocal sampling rates. if 0 < s.sampleRate && s.sampleRate < 1 { x /= s.sampleRate } return int64(x) } func (s statsDMetric) gaugeValue() float64 { // sampleRate does not have effect for gauge points. return s.asFloat } func (s statsDMetric) sampleValue() sampleValue { count := 1.0 if 0 < s.sampleRate && s.sampleRate < 1 { count /= s.sampleRate } return sampleValue{ value: s.asFloat, count: count, } } type dualSorter struct { values, weights []float64 } func (d dualSorter) Len() int { return len(d.values) } func (d dualSorter) Swap(i, j int) { d.values[i], d.values[j] = d.values[j], d.values[i] d.weights[i], d.weights[j] = d.weights[j], d.weights[i] } func (d dualSorter) Less(i, j int) bool { return d.values[i] < d.values[j] }