exporter/collector/internal/datapointstorage/datapointcache.go (251 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datapointstorage
import (
"hash"
"hash/fnv"
"sort"
"sync"
"time"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/atomic"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
)
const gcInterval = 20 * time.Minute
type Cache struct {
numberCache map[uint64]usedNumberPoint
summaryCache map[uint64]usedSummaryPoint
histogramCache map[uint64]usedHistogramPoint
exponentialHistogramCache map[uint64]usedExponentialHistogramPoint
numberLock sync.RWMutex
summaryLock sync.RWMutex
histogramLock sync.RWMutex
exponentialHistogramLock sync.RWMutex
}
type usedNumberPoint struct {
point pmetric.NumberDataPoint
used *atomic.Bool
}
type usedSummaryPoint struct {
point pmetric.SummaryDataPoint
used *atomic.Bool
}
type usedHistogramPoint struct {
point pmetric.HistogramDataPoint
used *atomic.Bool
}
type usedExponentialHistogramPoint struct {
point pmetric.ExponentialHistogramDataPoint
used *atomic.Bool
}
// NewCache instantiates a cache and starts background processes.
func NewCache(shutdown <-chan struct{}) *Cache {
c := &Cache{
numberCache: make(map[uint64]usedNumberPoint),
summaryCache: make(map[uint64]usedSummaryPoint),
histogramCache: make(map[uint64]usedHistogramPoint),
exponentialHistogramCache: make(map[uint64]usedExponentialHistogramPoint),
}
go func() {
ticker := time.NewTicker(gcInterval)
//nolint:revive
for c.gc(shutdown, ticker.C) {
}
}()
return c
}
// GetNumberDataPoint retrieves the point associated with the identifier, and whether
// or not it was found.
func (c *Cache) GetNumberDataPoint(identifier uint64) (pmetric.NumberDataPoint, bool) {
c.numberLock.RLock()
defer c.numberLock.RUnlock()
point, found := c.numberCache[identifier]
if found {
point.used.Store(true)
}
return point.point, found
}
// SetNumberDataPoint assigns the point to the identifier in the cache.
func (c *Cache) SetNumberDataPoint(identifier uint64, point pmetric.NumberDataPoint) {
c.numberLock.Lock()
defer c.numberLock.Unlock()
if existing, ok := c.numberCache[identifier]; ok {
existing.used.Store(true)
minimalNumberDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewNumberDataPoint()
minimalNumberDataPointCopyTo(point, newPoint)
c.numberCache[identifier] = usedNumberPoint{newPoint, atomic.NewBool(true)}
}
// GetSummaryDataPoint retrieves the point associated with the identifier, and whether
// or not it was found.
func (c *Cache) GetSummaryDataPoint(identifier uint64) (pmetric.SummaryDataPoint, bool) {
c.summaryLock.RLock()
defer c.summaryLock.RUnlock()
point, found := c.summaryCache[identifier]
if found {
point.used.Store(true)
}
return point.point, found
}
// SetSummaryDataPoint assigns the point to the identifier in the cache.
func (c *Cache) SetSummaryDataPoint(identifier uint64, point pmetric.SummaryDataPoint) {
c.summaryLock.Lock()
defer c.summaryLock.Unlock()
if existing, ok := c.summaryCache[identifier]; ok {
existing.used.Store(true)
minimalSummaryDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewSummaryDataPoint()
minimalSummaryDataPointCopyTo(point, newPoint)
c.summaryCache[identifier] = usedSummaryPoint{newPoint, atomic.NewBool(true)}
}
// GetHistogramDataPoint retrieves the point associated with the identifier, and whether
// or not it was found.
func (c *Cache) GetHistogramDataPoint(identifier uint64) (pmetric.HistogramDataPoint, bool) {
c.histogramLock.RLock()
defer c.histogramLock.RUnlock()
point, found := c.histogramCache[identifier]
if found {
point.used.Store(true)
}
return point.point, found
}
// SetHistogramDataPoint assigns the point to the identifier in the cache.
func (c *Cache) SetHistogramDataPoint(identifier uint64, point pmetric.HistogramDataPoint) {
c.histogramLock.Lock()
defer c.histogramLock.Unlock()
if existing, ok := c.histogramCache[identifier]; ok {
existing.used.Store(true)
minimalHistogramDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewHistogramDataPoint()
minimalHistogramDataPointCopyTo(point, newPoint)
c.histogramCache[identifier] = usedHistogramPoint{newPoint, atomic.NewBool(true)}
}
// GetExponentialHistogramDataPoint retrieves the point associated with the identifier, and whether
// or not it was found.
func (c *Cache) GetExponentialHistogramDataPoint(identifier uint64) (pmetric.ExponentialHistogramDataPoint, bool) {
c.exponentialHistogramLock.RLock()
defer c.exponentialHistogramLock.RUnlock()
point, found := c.exponentialHistogramCache[identifier]
if found {
point.used.Store(true)
}
return point.point, found
}
// SetExponentialHistogramDataPoint assigns the point to the identifier in the cache.
func (c *Cache) SetExponentialHistogramDataPoint(identifier uint64, point pmetric.ExponentialHistogramDataPoint) {
c.exponentialHistogramLock.Lock()
defer c.exponentialHistogramLock.Unlock()
if existing, ok := c.exponentialHistogramCache[identifier]; ok {
existing.used.Store(true)
minimalExponentialHistogramDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewExponentialHistogramDataPoint()
minimalExponentialHistogramDataPointCopyTo(point, newPoint)
c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{newPoint, atomic.NewBool(true)}
}
// gc garbage collects the cache after the ticker ticks.
func (c *Cache) gc(shutdown <-chan struct{}, tickerCh <-chan time.Time) bool {
select {
case <-shutdown:
return false
case <-tickerCh:
// garbage collect the numberCache
c.numberLock.Lock()
for id, point := range c.numberCache {
// for points that have been used, mark them as unused
if point.used.Load() {
point.used.Store(false)
} else {
// for points that have not been used, delete points
delete(c.numberCache, id)
}
}
c.numberLock.Unlock()
// garbage collect the summaryCache
c.summaryLock.Lock()
for id, point := range c.summaryCache {
// for points that have been used, mark them as unused
if point.used.Load() {
point.used.Store(false)
} else {
// for points that have not been used, delete points
delete(c.summaryCache, id)
}
}
c.summaryLock.Unlock()
// garbage collect the histogramCache
c.histogramLock.Lock()
for id, point := range c.histogramCache {
// for points that have been used, mark them as unused
if point.used.Load() {
point.used.Store(false)
} else {
// for points that have not been used, delete points
delete(c.histogramCache, id)
}
}
c.histogramLock.Unlock()
// garbage collect the exponentialHistogramCache
c.exponentialHistogramLock.Lock()
for id, point := range c.exponentialHistogramCache {
// for points that have been used, mark them as unused
if point.used.Load() {
point.used.Store(false)
} else {
// for points that have not been used, delete points
delete(c.exponentialHistogramCache, id)
}
}
c.exponentialHistogramLock.Unlock()
}
return true
}
// Use the same constants as Prometheus uses for hashing:
// https://github.com/prometheus/prometheus/blob/282fb1632ad62a82401a230f486538a72384faf0/model/labels/labels_common.go#L32
var (
itemSep = []byte{'\xfe'} // Used between identifiers
kvSep = []byte{'\xff'} // Used between map keys and values
)
// Identifier returns the unique string identifier for a metric.
func Identifier(resource *monitoredrespb.MonitoredResource, extraLabels map[string]string, metric pmetric.Metric, attributes pcommon.Map) uint64 {
h := fnv.New64()
h.Write([]byte(resource.GetType()))
h.Write(itemSep)
h.Write([]byte(metric.Name()))
h.Write(itemSep)
attrs := make(map[string]string)
attributes.Range(func(k string, v pcommon.Value) bool {
attrs[k] = v.AsString()
return true
})
hashOfMap(h, extraLabels)
h.Write(itemSep)
hashOfMap(h, attrs)
h.Write(itemSep)
hashOfMap(h, resource.GetLabels())
return h.Sum64()
}
func hashOfMap(h hash.Hash64, m map[string]string) {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
h.Write([]byte(key))
h.Write(kvSep)
h.Write([]byte(m[key]))
h.Write(kvSep)
}
}
// minimalSummaryDataPointCopyTo is the same as CopyTo for SummaryDataPoint,
// but only copies values required for normalization.
func minimalSummaryDataPointCopyTo(src, dest pmetric.SummaryDataPoint) {
// We do not copy attributes, start timestamp, flags, quantiles
dest.SetTimestamp(src.Timestamp())
dest.SetCount(src.Count())
dest.SetSum(src.Sum())
}
// minimalHistogramDataPointCopyTo is the same as CopyTo for
// HistogramDataPoint, but only copies values required for normalization.
func minimalHistogramDataPointCopyTo(src, dest pmetric.HistogramDataPoint) {
// We do not copy attributes, start timestamp, flags, exemplars, min, max
dest.SetTimestamp(src.Timestamp())
dest.SetCount(src.Count())
src.BucketCounts().CopyTo(dest.BucketCounts())
src.ExplicitBounds().CopyTo(dest.ExplicitBounds())
if src.HasSum() {
dest.SetSum(src.Sum())
}
}
// minimalExponentialHistogramDataPointCopyTo is the same as CopyTo for
// ExponentialHistogramDataPoint, but only copies values required for normalization.
func minimalExponentialHistogramDataPointCopyTo(src, dest pmetric.ExponentialHistogramDataPoint) {
// We do not copy attributes, start timestamp, flags, exemplars, min, max, zero threshold
dest.SetTimestamp(src.Timestamp())
dest.SetCount(src.Count())
dest.SetZeroCount(src.ZeroCount())
dest.SetScale(src.Scale())
src.Positive().CopyTo(dest.Positive())
src.Negative().CopyTo(dest.Negative())
if src.HasSum() {
dest.SetSum(src.Sum())
}
dest.SetZeroThreshold(src.ZeroThreshold())
}
// minimalNumberDataPointCopyTo is the same as CopyTo for NumberDataPoint,
// but only copies values required for normalization.
func minimalNumberDataPointCopyTo(src, dest pmetric.NumberDataPoint) {
// We do not copy attributes, start timestamp, flags, exemplars
dest.SetTimestamp(src.Timestamp())
switch src.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
dest.SetDoubleValue(src.DoubleValue())
case pmetric.NumberDataPointValueTypeInt:
dest.SetIntValue(src.IntValue())
}
}