cmd/otelinmemexporter/store.go (409 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package otelinmemexporter
import (
"errors"
"fmt"
"sync"
"time"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// AggregationType defines the type of aggregation the store
// will perform on a filtered metrics.
type AggregationType string
func (agg AggregationType) IsValid() bool {
return agg == Last || agg == Rate || agg == Sum || agg == Percentile
}
const (
Last AggregationType = "last" // only for number
Rate AggregationType = "rate" // only for number
Sum AggregationType = "sum"
Percentile AggregationType = "percentile" // only for histogram
)
type metric interface {
pmetric.NumberDataPoint | pmetric.HistogramDataPoint
}
type (
metricNameToAggConfigs map[string][]AggregationConfig
keyToAggConfig map[string]*AggregationConfig
keyToGroupToMetric[T metric] map[string]map[string]T
)
// Store is an in-memory data store for telemetry data. Data
// exported from the in-memory exporter will be aggregated
// in the Store and queried from the store. Store only stores
// a specfic set of entries specified during creation.
type Store struct {
sync.RWMutex
nameM metricNameToAggConfigs
keyM keyToAggConfig
nums keyToGroupToMetric[pmetric.NumberDataPoint]
hists keyToGroupToMetric[pmetric.HistogramDataPoint]
logger *zap.Logger
}
// AggregationConfig defines the configuration for filtering,
// aggregating, and caching the metrics in the store.
//
// Each metric with a specific name and label values MUST have
// a unique aggregation type i.e. the store will only aggregate
// a single aggregation type for a specific name and label
// values combination. If duplicate entries are provided an
// error will be returned with the creation of a new store.
type AggregationConfig struct {
// Key is used to describe the aggregated metrics produced by
// the specified aggregation config. Key must be unique across
// different aggregation configs.
Key string `mapstructure:"key"`
// Name specifies the metric name to include.
Name string `mapstructure:"name"`
// MatchLabelValues specifies a subset of attributes that
// should match to aggregate a metric. All metrics with
// matching labels subset will be aggregated together.
MatchLabelValues map[string]string `mapstructure:"match_label_values"`
// Type defines a type of aggregation that the store will
// perform on a filtered metric with the given name and
// label values. Only one type is allowed for a specific
// combination of name and label values.
Type AggregationType `mapstructure:"aggregation_type"`
// Percentile defines the aggregation percentile to use if
// Type is "percentile". It will be used for calculating the
// percentile of histograms. Must be in range `(0., 100.]`.
Percentile float64 `mapstructure:"percentile"`
// GroupBy allows grouping the metrics by a specific key. An
// empty value for group by signifies no grouping.
GroupBy string `mapstructure:"group_by"`
}
// MarshalLogObject implements zapcore.ObjectMarshaler to allow adding
// config to logging context.
func (cfg *AggregationConfig) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("name", cfg.Name)
enc.AddString("type", string(cfg.Type))
enc.AddString("key", cfg.Key)
enc.AddString("group_by", cfg.GroupBy)
enc.AddFloat64("percentile", cfg.Percentile)
enc.AddObject("label_values", zapcore.ObjectMarshalerFunc(
func(enc zapcore.ObjectEncoder) error {
for l, v := range cfg.MatchLabelValues {
enc.AddString("label", l)
enc.AddString("value", v)
}
return nil
},
))
return nil
}
func (cfg *AggregationConfig) validate() error {
if cfg.Name == "" {
return errors.New("aggregation config must have name")
}
if cfg.Key == "" {
return errors.New("aggregation config must have key")
}
if !cfg.Type.IsValid() {
return fmt.Errorf("aggregation config %s invalid type: %s", cfg.Key, cfg.Type)
}
if cfg.Type == Percentile {
if cfg.Percentile <= 0 || cfg.Percentile > 100 {
return fmt.Errorf("aggregation config %s invalid aggregation percentile %f", cfg.Key, cfg.Percentile)
}
}
return nil
}
func (cfg *AggregationConfig) isEqualAttrs(
name string,
attrs, resAttrs pcommon.Map,
) bool {
if cfg.Name != name {
return false
}
if len(cfg.MatchLabelValues) > (attrs.Len() + resAttrs.Len()) {
return false
}
for k, v := range cfg.MatchLabelValues {
targetV := getValueFromMaps(k, attrs, resAttrs)
if targetV.Type() == pcommon.ValueTypeEmpty || v != targetV.AsString() {
return false
}
}
return true
}
// NewStore creates a new in memory metric store. Returns an
// error if the provided config is invalid.
func NewStore(aggs []AggregationConfig, logger *zap.Logger) (*Store, error) {
keyM, nameM, err := validateAndGroupAggregationConfigs(aggs)
if err != nil {
return nil, err
}
return &Store{
keyM: keyM,
nameM: nameM,
logger: logger,
}, nil
}
// Add adds metrics to the store.
// The metrics must be of delta temporality, otherwise they will be ignored with warning.
//
// Two kinds of metrics are supported, each with different aggregations:
// 1. pmetric.Sum / pmetric.Gauge (aggregation: Last, Sum, Rate)
// 2. pmetric.Histogram (aggregation: Percentile, Sum)
//
// Unsupported metric-aggregation combinations will be ignored with warning.
func (s *Store) Add(ld pmetric.Metrics) {
s.Lock()
defer s.Unlock()
rms := ld.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
resAttrs := rm.Resource().Attributes()
sms := rm.ScopeMetrics()
for j := 0; j < sms.Len(); j++ {
ms := sms.At(j).Metrics()
for k := 0; k < ms.Len(); k++ {
s.add(ms.At(k), resAttrs)
}
}
}
}
// GetAll returns all the aggregated values for all the configured
// aggregation configs. If `GroupBy` is configured in the aggregation
// config then the results are grouped based on the observed values
// for the grouped by key. No grouping is identified by an empty key.
func (s *Store) GetAll() map[string]map[string]float64 {
s.RLock()
defer s.RUnlock()
m := make(map[string]map[string]float64, len(s.nums)+len(s.hists))
for key, cfg := range s.keyM {
numDPByGrp, numExist := s.nums[key]
histDPByGrp, histExist := s.hists[key]
if !numExist && !histExist {
m[key] = map[string]float64{"": 0}
continue
}
m[key] = make(map[string]float64, len(numDPByGrp)+len(histDPByGrp))
for grp, dp := range numDPByGrp {
m[key][grp] = getNumAggByType(cfg.Type, dp)
}
for grp, dp := range histDPByGrp {
m[key][grp] = getHistAggByType(cfg.Type, cfg.Percentile, dp)
}
}
return m
}
// Get returns the aggregated value of a configured aggregation config.
// If `GroupBy` is configured in the aggregation config then the results
// are grouped based on the observed values for the grouped by key. No
// grouping is identified by an empty key.
func (s *Store) Get(key string) (map[string]float64, error) {
s.RLock()
defer s.RUnlock()
cfg, ok := s.keyM[key]
if !ok {
return nil, fmt.Errorf("key %s is not configured", key)
}
numDPByGrp, numExist := s.nums[key]
histDPByGrp, histExist := s.hists[key]
if !numExist && !histExist {
return map[string]float64{"": 0}, nil
}
m := make(map[string]float64, len(numDPByGrp)+len(histDPByGrp))
for k, dp := range numDPByGrp {
m[k] = getNumAggByType(cfg.Type, dp)
}
for k, dp := range histDPByGrp {
m[k] = getHistAggByType(cfg.Type, cfg.Percentile, dp)
}
return m, nil
}
// Reset resets the store by deleting all cached data.
func (s *Store) Reset() {
s.Lock()
defer s.Unlock()
for k := range s.nums {
delete(s.nums, k)
}
}
func (s *Store) add(m pmetric.Metric, resAttrs pcommon.Map) {
// Fast fail if metric name is not filtered
_, ok := s.nameM[m.Name()]
if !ok {
s.logger.Debug(
"skipping metric, no config matched",
zap.String("name", m.Name()),
zap.String("type", m.Type().String()),
)
return
}
switch m.Type() {
case pmetric.MetricTypeGauge:
s.mergeNumberDataPoints(m.Name(), m.Gauge().DataPoints(), resAttrs)
case pmetric.MetricTypeSum:
if m.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
s.logger.Warn(
"unexpected, all cumulative temporality should be converted to delta",
zap.String("name", m.Name()),
zap.String("type", m.Type().String()),
)
return
}
s.mergeNumberDataPoints(m.Name(), m.Sum().DataPoints(), resAttrs)
case pmetric.MetricTypeHistogram:
if m.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
s.logger.Warn(
"unexpected, all cumulative temporality should be converted to delta",
zap.String("name", m.Name()),
zap.String("type", m.Type().String()),
)
return
}
s.mergeHistogramDataPoints(m.Name(), m.Histogram().DataPoints(), resAttrs)
default:
s.logger.Warn(
"metric type not implemented",
zap.String("type", m.Type().String()),
)
}
}
func (s *Store) mergeNumberDataPoints(
name string,
from pmetric.NumberDataPointSlice,
resAttrs pcommon.Map,
) {
if s.nums == nil {
s.nums = make(map[string]map[string]pmetric.NumberDataPoint)
}
for i := 0; i < from.Len(); i++ {
dp := from.At(i)
attrs := dp.Attributes()
for _, cfg := range s.filterCfgs(name, attrs, resAttrs) {
to := getMergeTo(s.nums, pmetric.NewNumberDataPoint, cfg, attrs, resAttrs)
switch cfg.Type {
case Last:
to.SetDoubleValue(doubleValue(dp))
case Sum:
to.SetDoubleValue(to.DoubleValue() + doubleValue(dp))
case Rate:
val := doubleValue(dp)
if val != 0 {
to.SetDoubleValue(to.DoubleValue() + val)
// We will use to#StartTimestamp and to#Timestamp fields to
// cache the lowest and the highest timestamps. This will be
// used at query time to calculate rate.
if to.StartTimestamp() == 0 {
// If the data point has a start timestamp then use that
// as the start timestamp, else use the end timestamp.
if dp.StartTimestamp() != 0 {
to.SetStartTimestamp(dp.StartTimestamp())
} else {
to.SetStartTimestamp(dp.Timestamp())
}
}
if to.Timestamp() < dp.Timestamp() {
to.SetTimestamp(dp.Timestamp())
}
}
default:
s.logger.Warn(
"aggregation type not available for number data points",
zap.String("name", name),
zap.String("agg_type", string(cfg.Type)),
)
}
}
}
}
func getNumAggByType(typ AggregationType, dp pmetric.NumberDataPoint) float64 {
switch typ {
case Rate:
if dp.DoubleValue() == 0 {
return 0
}
duration := time.Duration(dp.Timestamp() - dp.StartTimestamp()).Seconds()
if duration <= 0 {
return 0
}
return dp.DoubleValue() / duration
case Last, Sum:
return dp.DoubleValue()
default:
// Should not be able to reach here since it should be aborted on consuming metrics.
return 0
}
}
func (s *Store) mergeHistogramDataPoints(
name string,
from pmetric.HistogramDataPointSlice,
resAttrs pcommon.Map,
) {
if s.hists == nil {
s.hists = make(keyToGroupToMetric[pmetric.HistogramDataPoint])
}
for i := 0; i < from.Len(); i++ {
fromDP := from.At(i)
if fromDP.Count() == 0 {
// Skip histogram data points with no population.
continue
}
attrs := fromDP.Attributes()
for _, cfg := range s.filterCfgs(name, attrs, resAttrs) {
toDP := getMergeTo(s.hists, pmetric.NewHistogramDataPoint, cfg, attrs, resAttrs)
switch cfg.Type {
case Sum, Percentile:
addHistogramDataPoint(fromDP, toDP)
default:
s.logger.Warn(
"aggregation type not available for histogram data points",
zap.String("name", name),
zap.String("agg_type", string(cfg.Type)),
)
}
}
}
}
func getHistAggByType(typ AggregationType, p float64, dp pmetric.HistogramDataPoint) float64 {
switch typ {
case Percentile:
// Need to convert percentile to quantile.
return deltaExplicitBucketsQuantile(p/100, explicitBucketsFromHistogramDataPoint(dp))
case Sum:
return dp.Sum()
default:
// Should not be able to reach here since it should be aborted on consuming metrics.
return 0
}
}
func (s *Store) filterCfgs(
name string,
attrs, resAttrs pcommon.Map,
) []AggregationConfig {
cfgs, ok := s.nameM[name]
if !ok {
return nil
}
var result []AggregationConfig
for _, cfg := range cfgs {
if cfg.isEqualAttrs(name, attrs, resAttrs) {
result = append(result, cfg)
}
}
return result
}
func getMergeTo[T metric](
m keyToGroupToMetric[T],
initFn func() T,
cfg AggregationConfig,
attrs, resAttrs pcommon.Map,
) T {
grp := getValueFromMaps(cfg.GroupBy, attrs, resAttrs).AsString()
if _, ok := m[cfg.Key]; !ok {
m[cfg.Key] = make(map[string]T)
}
if _, ok := m[cfg.Key][grp]; !ok {
m[cfg.Key][grp] = initFn()
}
return m[cfg.Key][grp]
}
func validateAndGroupAggregationConfigs(src []AggregationConfig) (keyToAggConfig, metricNameToAggConfigs, error) {
nameM := make(map[string][]AggregationConfig)
keyM := make(map[string]*AggregationConfig)
for i := range src {
srcCfg := src[i]
if err := srcCfg.validate(); err != nil {
return nil, nil, err
}
if _, seen := keyM[srcCfg.Key]; seen {
return nil, nil, fmt.Errorf(
"key should be unique, found duplicate: %s", srcCfg.Key)
}
if toCfgs, ok := nameM[srcCfg.Name]; ok {
for _, toCfg := range toCfgs {
if err := checkDuplicateAggConfigForName(toCfg, srcCfg); err != nil {
return nil, nil, err
}
}
}
nameM[srcCfg.Name] = append(nameM[srcCfg.Name], srcCfg)
keyM[srcCfg.Key] = &srcCfg
}
return keyM, nameM, nil
}
func checkDuplicateAggConfigForName(src, target AggregationConfig) error {
if src.Name != target.Name {
return nil
}
if len(src.MatchLabelValues) != len(target.MatchLabelValues) {
return nil
}
for k, v := range src.MatchLabelValues {
targetV, ok := target.MatchLabelValues[k]
if !ok || v != targetV {
return nil
}
}
if src.Type != target.Type {
return fmt.Errorf("cannot record same metric with different types: %s", src.Name)
}
if src.Type == Percentile && target.Type == Percentile {
if src.Percentile != target.Percentile {
return nil
}
}
return fmt.Errorf("duplicate config found for name: %s", src.Name)
}
func doubleValue(dp pmetric.NumberDataPoint) float64 {
switch dp.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
return dp.DoubleValue()
case pmetric.NumberDataPointValueTypeInt:
return float64(dp.IntValue())
default:
return 0
}
}
func getValueFromMaps(key string, maps ...pcommon.Map) pcommon.Value {
for _, m := range maps {
v, ok := m.Get(key)
if ok {
return v
}
}
return pcommon.NewValueEmpty()
}