aggregators/cachedeventsmap.go (53 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package aggregators
import (
"math"
"sync"
"sync/atomic"
"time"
)
// cachedEventsMap holds a counts of cached events, keyed by interval and ID.
// Cached events are events that have been processed by Aggregate methods,
// but which haven't yet been harvested. Event counts are fractional because
// an event may be spread over multiple partitions.
//
// Access to the map is protected with a mutex. During harvest, an exclusive
// (write) lock is held. Concurrent aggregations may perform atomic updates
// to the map, and the harvester may assume that the map will not be modified
// while it is reading it.
type cachedEventsMap struct {
// (interval, id) -> count
m sync.Map
countPool sync.Pool
}
func (m *cachedEventsMap) loadAndDelete(end time.Time) map[time.Duration]map[[16]byte]float64 {
loaded := make(map[time.Duration]map[[16]byte]float64)
m.m.Range(func(k, v any) bool {
key := k.(cachedEventsStatsKey)
if !end.Truncate(key.interval).Equal(end) {
return true
}
intervalMetrics, ok := loaded[key.interval]
if !ok {
intervalMetrics = make(map[[16]byte]float64)
loaded[key.interval] = intervalMetrics
}
vscaled := *v.(*uint64)
value := float64(vscaled / math.MaxUint16)
intervalMetrics[key.id] = value
m.m.Delete(k)
m.countPool.Put(v)
return true
})
return loaded
}
func (m *cachedEventsMap) add(interval time.Duration, id [16]byte, n float64) {
// We use a pool for the value to minimise allocations, as it will
// always escape to the heap through LoadOrStore.
nscaled, ok := m.countPool.Get().(*uint64)
if !ok {
nscaled = new(uint64)
}
// Scale by the maximum number of partitions to get an integer value,
// for simpler atomic operations.
*nscaled = uint64(n * math.MaxUint16)
key := cachedEventsStatsKey{interval: interval, id: id}
old, loaded := m.m.Load(key)
if !loaded {
old, loaded = m.m.LoadOrStore(key, nscaled)
if !loaded {
// Stored a new value.
return
}
}
atomic.AddUint64(old.(*uint64), *nscaled)
m.countPool.Put(nscaled)
}
type cachedEventsStatsKey struct {
interval time.Duration
id [16]byte
}