aggregators/internal/hdrhistogram/hdrhistogram.go (281 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. // The MIT License (MIT) // // Copyright (c) 2014 Coda Hale // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. // Package hdrhistogram provides an optimized histogram for sparse samples. // This is a stop gap measure until we have [packed histogram implementation](https://www.javadoc.io/static/org.hdrhistogram/HdrHistogram/2.1.12/org/HdrHistogram/PackedHistogram.html). package hdrhistogram import ( "fmt" "math" "math/bits" "slices" "time" ) const ( lowestTrackableValue = 1 highestTrackableValue = 3.6e+9 // 1 hour in microseconds significantFigures = 2 // We scale transaction counts in the histogram, which only permits storing // integer counts, to allow for fractional transactions due to sampling. // // e.g. if the sampling rate is 0.4, then each sampled transaction has a // representative count of 2.5 (1/0.4). If we receive two such transactions // we will record a count of 5000 (2 * 2.5 * histogramCountScale). When we // publish metrics, we will scale down to 5 (5000 / histogramCountScale). histogramCountScale = 1000 ) var ( unitMagnitude = getUnitMagnitude() bucketCount = getBucketCount() subBucketCount = getSubBucketCount() subBucketHalfCountMagnitude = getSubBucketHalfCountMagnitude() subBucketHalfCount = getSubBucketHalfCount() subBucketMask = getSubBucketMask() countsLen = getCountsLen() ) // HistogramRepresentation is an optimization over HDR histogram mainly useful // for recording values clustered in some range rather than distributed over // the full range of the HDR histogram. It is based on the [hdrhistogram-go](https://github.com/HdrHistogram/hdrhistogram-go) package. // The package is not safe for concurrent usage, use an external lock // protection if required. type HistogramRepresentation struct { LowestTrackableValue int64 HighestTrackableValue int64 SignificantFigures int64 CountsRep HybridCountsRep } // New returns a new instance of HistogramRepresentation func New() *HistogramRepresentation { return &HistogramRepresentation{ LowestTrackableValue: lowestTrackableValue, HighestTrackableValue: highestTrackableValue, SignificantFigures: significantFigures, } } // RecordDuration records duration in the histogram representation. It // supports recording float64 upto 3 decimal places. This is achieved // by scaling the count. func (h *HistogramRepresentation) RecordDuration(d time.Duration, n float64) error { count := int64(math.Round(n * histogramCountScale)) v := d.Microseconds() return h.RecordValues(v, count) } // RecordValues records values in the histogram representation. func (h *HistogramRepresentation) RecordValues(v, n int64) error { idx := h.countsIndexFor(v) if idx < 0 || int32(countsLen) <= idx { return fmt.Errorf("value %d is too large to be recorded", v) } h.CountsRep.Add(idx, n) return nil } // Merge merges the provided histogram representation. // TODO: Add support for migration from a histogram representation // with different parameters. func (h *HistogramRepresentation) Merge(from *HistogramRepresentation) { if from == nil { return } from.CountsRep.ForEach(func(bucket int32, value int64) { h.CountsRep.Add(bucket, value) }) } // Buckets converts the histogram into ordered slices of counts // and values per bar along with the total count. func (h *HistogramRepresentation) Buckets() (uint64, []uint64, []float64) { counts := make([]uint64, 0, h.CountsRep.Len()) values := make([]float64, 0, h.CountsRep.Len()) var totalCount uint64 var prevBucket int32 iter := h.iterator() iter.nextCountAtIdx() h.CountsRep.ForEach(func(bucket int32, scaledCounts int64) { if scaledCounts <= 0 { return } if iter.advance(int(bucket - prevBucket)) { count := uint64(math.Round(float64(scaledCounts) / histogramCountScale)) counts = append(counts, count) values = append(values, float64(iter.highestEquivalentValue)) totalCount += count } prevBucket = bucket }) return totalCount, counts, values } func (h *HistogramRepresentation) countsIndexFor(v int64) int32 { bucketIdx := h.getBucketIndex(v) subBucketIdx := h.getSubBucketIdx(v, bucketIdx) return h.countsIndex(bucketIdx, subBucketIdx) } func (h *HistogramRepresentation) countsIndex(bucketIdx, subBucketIdx int32) int32 { baseBucketIdx := (bucketIdx + 1) << uint(subBucketHalfCountMagnitude) return baseBucketIdx + subBucketIdx - subBucketHalfCount } func (h *HistogramRepresentation) getBucketIndex(v int64) int32 { var pow2Ceiling = int64(64 - bits.LeadingZeros64(uint64(v|subBucketMask))) return int32(pow2Ceiling - int64(unitMagnitude) - int64(subBucketHalfCountMagnitude+1)) } func (h *HistogramRepresentation) getSubBucketIdx(v int64, idx int32) int32 { return int32(v >> uint(int64(idx)+int64(unitMagnitude))) } func (h *HistogramRepresentation) valueFromIndex(bucketIdx, subBucketIdx int32) int64 { return int64(subBucketIdx) << uint(bucketIdx+unitMagnitude) } func (h *HistogramRepresentation) highestEquivalentValue(v int64) int64 { return h.nextNonEquivalentValue(v) - 1 } func (h *HistogramRepresentation) nextNonEquivalentValue(v int64) int64 { bucketIdx := h.getBucketIndex(v) return h.lowestEquivalentValueGivenBucketIdx(v, bucketIdx) + h.sizeOfEquivalentValueRangeGivenBucketIdx(v, bucketIdx) } func (h *HistogramRepresentation) lowestEquivalentValueGivenBucketIdx(v int64, bucketIdx int32) int64 { subBucketIdx := h.getSubBucketIdx(v, bucketIdx) return h.valueFromIndex(bucketIdx, subBucketIdx) } func (h *HistogramRepresentation) sizeOfEquivalentValueRangeGivenBucketIdx(v int64, bucketIdx int32) int64 { subBucketIdx := h.getSubBucketIdx(v, bucketIdx) adjustedBucket := bucketIdx if subBucketIdx >= subBucketCount { adjustedBucket++ } return int64(1) << uint(unitMagnitude+adjustedBucket) } func (h *HistogramRepresentation) iterator() *iterator { return &iterator{ h: h, subBucketIdx: -1, } } type iterator struct { h *HistogramRepresentation bucketIdx, subBucketIdx int32 valueFromIdx int64 highestEquivalentValue int64 } // advance advances the iterator by count func (i *iterator) advance(count int) bool { for c := 0; c < count; c++ { if !i.nextCountAtIdx() { return false } } i.highestEquivalentValue = i.h.highestEquivalentValue(i.valueFromIdx) return true } func (i *iterator) nextCountAtIdx() bool { // increment bucket i.subBucketIdx++ if i.subBucketIdx >= subBucketCount { i.subBucketIdx = subBucketHalfCount i.bucketIdx++ } if i.bucketIdx >= bucketCount { return false } i.valueFromIdx = i.h.valueFromIndex(i.bucketIdx, i.subBucketIdx) return true } func getSubBucketHalfCountMagnitude() int32 { largetValueWithSingleUnitResolution := 2 * math.Pow10(significantFigures) subBucketCountMagnitude := int32(math.Ceil(math.Log2( largetValueWithSingleUnitResolution, ))) if subBucketCountMagnitude < 1 { return 0 } return subBucketCountMagnitude - 1 } func getUnitMagnitude() int32 { unitMag := int32(math.Floor(math.Log2( lowestTrackableValue, ))) if unitMag < 0 { return 0 } return unitMag } func getSubBucketCount() int32 { return int32(math.Pow(2, float64(getSubBucketHalfCountMagnitude()+1))) } func getSubBucketHalfCount() int32 { return getSubBucketCount() / 2 } func getSubBucketMask() int64 { return int64(getSubBucketCount()-1) << uint(getUnitMagnitude()) } func getCountsLen() int64 { return int64((getBucketCount() + 1) * (getSubBucketCount() / 2)) } func getBucketCount() int32 { smallestUntrackableValue := int64(getSubBucketCount()) << uint(getUnitMagnitude()) bucketsNeeded := int32(1) for smallestUntrackableValue < highestTrackableValue { if smallestUntrackableValue > (math.MaxInt64 / 2) { // next shift will overflow, meaning that bucket could // represent values up to ones greater than math.MaxInt64, // so it's the last bucket return bucketsNeeded + 1 } smallestUntrackableValue <<= 1 bucketsNeeded++ } return bucketsNeeded } // bar represents a bar of histogram. Each bar has a bucket, representing // where the bar belongs to in the histogram range, and the count of values // in each bucket. type bar struct { Bucket int32 Count int64 } // HybridCountsRep represents a hybrid counts representation for // sparse histogram. It is optimized to record a single value as // integer type and more values as map. type HybridCountsRep struct { bucket int32 value int64 s []bar } // Add adds a new value to a bucket of given index. func (c *HybridCountsRep) Add(bucket int32, value int64) { if c.s == nil && c.bucket == 0 && c.value == 0 { c.bucket = bucket c.value = value return } if c.s == nil { // automatic promotion to slice c.s = make([]bar, 0, 128) // TODO: Use pool c.s = slices.Insert(c.s, 0, bar{Bucket: c.bucket, Count: c.value}) c.bucket, c.value = 0, 0 } at, found := slices.BinarySearchFunc(c.s, bar{Bucket: bucket}, compareBar) if found { c.s[at].Count += value return } c.s = slices.Insert(c.s, at, bar{Bucket: bucket, Count: value}) } // ForEach iterates over each bucket and calls the given function. func (c *HybridCountsRep) ForEach(f func(int32, int64)) { if c.s == nil && (c.bucket != 0 || c.value != 0) { f(c.bucket, c.value) return } for i := range c.s { f(c.s[i].Bucket, c.s[i].Count) } } // Len returns the number of buckets currently recording. func (c *HybridCountsRep) Len() int { if c.s != nil { return len(c.s) } if c.bucket != 0 || c.value != 0 { return 1 } return 0 } // Get returns the count of values in a given bucket along with a bool // which is false if the bucket is not found. func (c *HybridCountsRep) Get(bucket int32) (int64, bool) { if c.s == nil { if c.bucket == bucket { return c.value, true } return 0, false } at, found := slices.BinarySearchFunc(c.s, bar{Bucket: bucket}, compareBar) if found { return c.s[at].Count, true } return 0, false } // Reset resets the values recorded. func (c *HybridCountsRep) Reset() { c.bucket = 0 c.value = 0 c.s = c.s[:0] } // Equal returns true if same bucket and count is recorded in both. func (c *HybridCountsRep) Equal(h *HybridCountsRep) bool { if c.Len() != h.Len() { return false } if c.Len() == 0 { return true } equal := true c.ForEach(func(bucket int32, value1 int64) { value2, ok := h.Get(bucket) if !ok || value1 != value2 { equal = false } }) return equal } func compareBar(a, b bar) int { if a.Bucket == b.Bucket { return 0 } if a.Bucket > b.Bucket { return 1 } return -1 }