cmd/otelinmemexporter/histogram.go (115 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package otelinmemexporter import ( "cmp" "math" "slices" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" ) type explicitBucket struct { UpperBound float64 // inclusive Count uint64 // beware that this will be converted to float64 in calculations } // explicitBucketsFromHistogramDataPoint extracts buckets from histogram data point. // The length of bucket counts in `dp` is assumed to be the length of explicit bounds plus one. // Violating this assumption will cause index-out-of-range panic. func explicitBucketsFromHistogramDataPoint(dp pmetric.HistogramDataPoint) []explicitBucket { bucketCounts := dp.BucketCounts().AsRaw() explicitBounds := dp.ExplicitBounds().AsRaw() buckets := make([]explicitBucket, 0, len(bucketCounts)) for i, ub := range explicitBounds { buckets = append(buckets, explicitBucket{ UpperBound: ub, Count: bucketCounts[i], }) } buckets = append(buckets, explicitBucket{ UpperBound: math.Inf(+1), Count: bucketCounts[len(bucketCounts)-1], }) return buckets } // deltaExplicitBucketsQuantile calculates the quantile `q` based on the given buckets. // The buckets are assumed to be of delta temporality (as opposed to cumulative) and // consist of non-overlapping contiguous bounds, e.g. `(-Inf, 0], (0, 10], (10, +Inf]`. // The buckets can be unsorted since the function will sort the buckets based on upper // bound prior to calculation. The buckets must also have the highest bound of +Inf. // // The quantile value is interpolated assuming a linear distribution within a bucket. // However, if the quantile falls into the highest bucket i.e. `(x, +Inf]`, the lower bound of that // bucket (x) is returned . On the other hand, if the quantile falls into the lowest bucket // i.e. `(-Inf, y]`, the upper bound of that bucket (y) is returned. // // Here are some special cases: // - If `q` == NaN, NaN is returned. // - If `q` < 0, -Inf is returned. // - If `q` > 1, +Inf is returned. // - If `buckets` has fewer than 2 elements, NaN is returned. // - If the highest bucket is not +Inf, NaN is returned. // - If `buckets` has 0 observations, NaN is returned. func deltaExplicitBucketsQuantile(q float64, buckets []explicitBucket) float64 { if math.IsNaN(q) { return math.NaN() } if q < 0 { return math.Inf(-1) } if q > 1 { return math.Inf(+1) } slices.SortFunc(buckets, func(a, b explicitBucket) int { // We don't expect the bucket boundary to be a NaN. return cmp.Compare(a.UpperBound, b.UpperBound) }) if len(buckets) < 2 { return math.NaN() } // The highest bound must be +Inf, and the lowest bound must be -Inf. if !math.IsInf(buckets[len(buckets)-1].UpperBound, +1) { return math.NaN() } // Check if there are any observations. var observations uint64 for _, bucket := range buckets { observations += bucket.Count } if observations == 0 { return math.NaN() } // Find the bucket that the quantile falls into. rank := q * float64(observations) var countSoFar uint64 bucketIdx := slices.IndexFunc(buckets, func(bucket explicitBucket) bool { countSoFar += bucket.Count // Compare using `>=` instead of `>` since upper bound is inclusive. return float64(countSoFar) >= rank }) if bucketIdx == len(buckets)-1 { return buckets[len(buckets)-2].UpperBound } if bucketIdx == 0 { return buckets[0].UpperBound } // Interpolate to get quantile in bucket. bucketStart := buckets[bucketIdx-1].UpperBound bucketEnd := buckets[bucketIdx].UpperBound bucketCount := buckets[bucketIdx].Count // How the bucket quantile is derived: // ==|=======|=======|=======|=======|== // | | | | | // | b - 2 | b - 1 | b | b + 1 | // ==|=======|=======|=======|=======|== // ----------------------> rank // --------------------------> countSoFar // |-------> bucketCount // |---> rank - (countSoFar - bucketCount) bucketQuantile := (rank - float64(countSoFar-bucketCount)) / float64(bucketCount) return bucketStart + (bucketEnd-bucketStart)*bucketQuantile } func float64SliceEqual(a, b pcommon.Float64Slice) bool { if a.Len() != b.Len() { return false } for i := 0; i < a.Len(); i++ { if a.At(i) != b.At(i) { return false } } return true } // addHistogramDataPoint adds the data from histogram data point `from` into `to`. // This data includes: // - count // - sum // - min (if `to` has min and `from` min is lower) // - max (if `to` has max and `from` max is higher) // - bucket counts (assumes that both data points have same explicit bounds) // - exemplars // - start timestamp (if `from` start timestamp is lower) // - timestamp (if `from` timestamp is lower) // // Note: If both `from` and `to` does not have the same explicit bounds, `from` will simply // replace `to` instead of being added. func addHistogramDataPoint(from, to pmetric.HistogramDataPoint) { if from.Count() == 0 { // `from` is empty, do nothing. return } if to.Count() == 0 { // `to` is new, simply copy over. from.CopyTo(to) return } if !float64SliceEqual(from.ExplicitBounds(), to.ExplicitBounds()) { // Mismatched explicit bounds, replace observations since we can't simply merge. from.CopyTo(to) return } to.SetCount(to.Count() + from.Count()) to.SetSum(to.Sum() + from.Sum()) // Overwrite min if lower. if to.HasMin() && to.Min() > from.Min() { to.SetMin(from.Min()) } // Overwrite max if higher. if to.HasMax() && to.Max() < from.Max() { to.SetMax(from.Max()) } // Merge buckets. bucketCounts := to.BucketCounts() for b := 0; b < from.BucketCounts().Len(); b++ { bucketCounts.SetAt(b, bucketCounts.At(b)+from.BucketCounts().At(b)) } from.Exemplars().MoveAndAppendTo(to.Exemplars()) // Overwrite start timestamp if lower. if from.StartTimestamp() < to.StartTimestamp() { to.SetStartTimestamp(from.StartTimestamp()) } // Overwrite timestamp if higher. if from.Timestamp() > to.Timestamp() { to.SetTimestamp(from.Timestamp()) } }