metric/distribution/seh1/seh1_distribution.go (166 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package seh1 import ( "fmt" "log" "math" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/aws/amazon-cloudwatch-agent/metric/distribution" ) var bucketForZero int16 = math.MinInt16 var bucketFactor = math.Log(1 + 0.1) type SEH1Distribution struct { maximum float64 minimum float64 sampleCount float64 sum float64 buckets map[int16]float64 // from bucket number (i.e. value) to the counter (i.e. weight) unit string } func NewSEH1Distribution() distribution.Distribution { return &SEH1Distribution{ maximum: 0, // negative number is not supported for now, so zero is the min value minimum: math.MaxFloat64, sampleCount: 0, sum: 0, buckets: map[int16]float64{}, unit: "", } } func (seh1Distribution *SEH1Distribution) Maximum() float64 { return seh1Distribution.maximum } func (seh1Distribution *SEH1Distribution) Minimum() float64 { return seh1Distribution.minimum } func (seh1Distribution *SEH1Distribution) SampleCount() float64 { return seh1Distribution.sampleCount } func (seh1Distribution *SEH1Distribution) Sum() float64 { return seh1Distribution.sum } func (seh1Distribution *SEH1Distribution) ValuesAndCounts() (values []float64, counts []float64) { values = []float64{} counts = []float64{} for bucketNumber, counter := range seh1Distribution.buckets { var value float64 if bucketNumber == bucketForZero { value = 0 } else { // Add 0.5 to calculate exponent for the middle of the bin value = math.Exp((float64(bucketNumber) + 0.5) * bucketFactor) } values = append(values, value) counts = append(counts, counter) } return } func (seh1Distribution *SEH1Distribution) Unit() string { return seh1Distribution.unit } func (seh1Distribution *SEH1Distribution) Size() int { return len(seh1Distribution.buckets) } // weight is 1/samplingRate func (seh1Distribution *SEH1Distribution) AddEntryWithUnit(value float64, weight float64, unit string) error { if weight <= 0 { return fmt.Errorf("unsupported weight %v: %w", weight, distribution.ErrUnsupportedWeight) } if !distribution.IsSupportedValue(value, 0, distribution.MaxValue) { return fmt.Errorf("unsupported value %v: %w", value, distribution.ErrUnsupportedValue) } //sample count seh1Distribution.sampleCount += weight //sum seh1Distribution.sum += value * weight //min if value < seh1Distribution.minimum { seh1Distribution.minimum = value } //max if value > seh1Distribution.maximum { seh1Distribution.maximum = value } //seh bucketNumber := bucketNumber(value) seh1Distribution.buckets[bucketNumber] += weight //unit if seh1Distribution.unit == "" { seh1Distribution.unit = unit } else if seh1Distribution.unit != unit && unit != "" { log.Printf("D! Multiple units are detected: %s, %s", seh1Distribution.unit, unit) } return nil } // weight is 1/samplingRate func (seh1Distribution *SEH1Distribution) AddEntry(value float64, weight float64) error { return seh1Distribution.AddEntryWithUnit(value, weight, "") } func (seh1Distribution *SEH1Distribution) AddDistribution(distribution distribution.Distribution) { seh1Distribution.AddDistributionWithWeight(distribution, 1) } func (seh1Distribution *SEH1Distribution) AddDistributionWithWeight(distribution distribution.Distribution, weight float64) { if distribution.SampleCount()*weight > 0 { //seh if fromSEH1Distribution, ok := distribution.(*SEH1Distribution); ok { for bucketNumber, bucketCounts := range fromSEH1Distribution.buckets { seh1Distribution.buckets[bucketNumber] += bucketCounts * weight } } else { log.Printf("E! The from distribution type is not compatible with the to distribution type: from distribution type %T, to distribution type %T", seh1Distribution, distribution) return } //sample count seh1Distribution.sampleCount += distribution.SampleCount() * weight //sum seh1Distribution.sum += distribution.Sum() * weight //min if distribution.Minimum() < seh1Distribution.minimum { seh1Distribution.minimum = distribution.Minimum() } //max if distribution.Maximum() > seh1Distribution.maximum { seh1Distribution.maximum = distribution.Maximum() } //unit if seh1Distribution.unit == "" { seh1Distribution.unit = distribution.Unit() } else if seh1Distribution.unit != distribution.Unit() && distribution.Unit() != "" { log.Printf("D! Multiple units are detected: %s, %s", seh1Distribution.unit, distribution.Unit()) } } else { log.Printf("D! SampleCount * Weight should be larger than 0: %v, %v", distribution.SampleCount(), weight) } } // ConvertToOtel could convert an SEH1Distribution to pmetric.ExponentialHistogram. // But there is no need because it will just get converted bak to a SEH1Distribution. func (sd *SEH1Distribution) ConvertToOtel(dp pmetric.HistogramDataPoint) { dp.SetMax(sd.maximum) dp.SetMin(sd.minimum) dp.SetCount(uint64(sd.sampleCount)) dp.SetSum(sd.sum) dp.ExplicitBounds().EnsureCapacity(len(sd.buckets)) dp.BucketCounts().EnsureCapacity(len(sd.buckets)) for k, v := range sd.buckets { dp.ExplicitBounds().Append(float64(k)) // Beware of potential loss of precision due to type conversion. dp.BucketCounts().Append(uint64(v)) } } func (sd *SEH1Distribution) ConvertFromOtel(dp pmetric.HistogramDataPoint, unit string) { sd.maximum = dp.Max() sd.minimum = dp.Min() sd.sampleCount = float64(dp.Count()) sd.sum = dp.Sum() sd.unit = unit for i := 0; i < dp.ExplicitBounds().Len(); i++ { k := dp.ExplicitBounds().At(i) v := dp.BucketCounts().At(i) sd.buckets[int16(k)] = float64(v) } } func (seh1Distribution *SEH1Distribution) CanAdd(value float64, sizeLimit int) bool { if seh1Distribution.Size() < sizeLimit { return true } bucketNumber := bucketNumber(value) if _, ok := seh1Distribution.buckets[bucketNumber]; ok { return true } return false } func bucketNumber(value float64) int16 { bucketNumber := bucketForZero if value > 0 { bucketNumber = int16(floor(math.Log(value) / bucketFactor)) } return bucketNumber } // This method is faster than math.Floor func floor(fvalue float64) int64 { ivalue := int64(fvalue) if fvalue < 0 && float64(ivalue) != fvalue { ivalue-- } return ivalue }