processor/lsmintervalprocessor/internal/merger/value.go (649 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package merger // import "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger"
import (
"encoding/binary"
"errors"
"fmt"
"slices"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config"
"github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/identity"
"github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger/limits"
)
const (
version = uint8(1)
overflowMetricName = "_overflow_metric"
overflowMetricDesc = "Overflow metric count due to metric limit"
overflowDatapointMetricName = "_overflow_datapoints"
overflowDatapointMetricDesc = "Overflow datapoint count due to datapoint limit"
)
// Value defines the data structure used to perform merges and other operations
// for the underlying LSM database. The basic representation of the Value is
// based on pmetric datastructure. To aid in merging and operations, the
// pmetric datastructure is expanded into multiple lookup maps as required.
//
// Value also tracks overflows based on defined limits. Once the overflow limit
// is breached, the new metrics are handled as per the defined overflow
// behaviour. The Value can be in two states:
//
// 1) Unexpanded state, in this state the lookup maps are not created. The
// limit trackers, if present, are encoded into a separate field. This state
// is immutable i.e. the pmetric structure and the metrics cannot be modified
// in this state.
//
// 2) Expanded state, in this state lookup maps are created and the limit
// trackers are decoded and put together with their corresponding pmetric
// datastructure. The value is automatically upgraded to expanded state
// when a merge operation is performed.
//
// Value is not safe for concurrent use.
type Value struct {
resourceLimitCfg config.LimitConfig
scopeLimitCfg config.LimitConfig
metricLimitCfg config.LimitConfig
datapointLimitCfg config.LimitConfig
maxExponentialHistogramBuckets int
source pmetric.Metrics
trackers *limits.Trackers
// Lookup tables created from source
lookupsInitialized bool
resLookup map[identity.Resource]pdataResourceMetrics
scopeLookup map[identity.Scope]pdataScopeMetrics
metricLookup map[identity.Metric]pdataMetric
numberLookup map[identity.Stream]pmetric.NumberDataPoint
summaryLookup map[identity.Stream]pmetric.SummaryDataPoint
histoLookup map[identity.Stream]pmetric.HistogramDataPoint
expHistoLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint
}
type pdataResourceMetrics struct {
pmetric.ResourceMetrics
// Keeps track of scopes within each resource metric
scopeTracker *limits.ScopeTracker
}
type pdataScopeMetrics struct {
pmetric.ScopeMetrics
// Keeps track of metrics within each scope metric
metricTracker *limits.MetricTracker
}
type pdataMetric struct {
pmetric.Metric
// Keeps track of datapoints within each metric
datapointTracker *limits.Tracker
}
// NewValue creates a new instance of the value with the configured limiters.
func NewValue(
resLimit, scopeLimit, metricLimit, datapointLimit config.LimitConfig,
maxExponentialHistogramBuckets int,
) *Value {
return &Value{
resourceLimitCfg: resLimit,
scopeLimitCfg: scopeLimit,
metricLimitCfg: metricLimit,
datapointLimitCfg: datapointLimit,
maxExponentialHistogramBuckets: maxExponentialHistogramBuckets,
source: pmetric.NewMetrics(),
}
}
// AppendBinary marshals the value into its binary representation,
// and appends it to b.
//
// Limit trackers and pmetric are marshaled into the same binary
// representation.
func (s *Value) AppendBinary(b []byte) ([]byte, error) {
b = append(b, version)
if s.source.DataPointCount() == 0 {
// Nothing to marshal
return b, nil
}
var marshaler pmetric.ProtoMarshaler
pmb, err := marshaler.MarshalMetrics(s.source)
if err != nil {
return nil, fmt.Errorf("failed to marshal metrics: %w", err)
}
lenOffset := len(b)
b = slices.Grow(b, 4)
b = b[:lenOffset+4] // leave space for the length of encoded trackers
b, err = s.trackers.AppendBinary(b)
if err != nil {
return nil, fmt.Errorf("failed to marshal trackers: %w", err)
}
trackersLen := len(b) - lenOffset - 4
binary.BigEndian.PutUint32(b[lenOffset:lenOffset+4], uint32(trackersLen))
b = append(b, pmb...)
return b, nil
}
// Unmarshal unmarshals the binary into the value struct. The value consists
// of the pmetric data structure and a set of limits tracking the overflows
// for each of the pmetric children (resource, scope, and datapoints). The
// limits are marshaled and encoded separately from the pmetric datastructure.
func (s *Value) Unmarshal(data []byte) error {
if len(data) == 0 {
return errors.New("failed to unmarshal value, invalid length")
}
if data[0] != version {
return fmt.Errorf("unsupported version: %d", data[0])
}
data = data[1:]
if len(data) == 0 {
return nil
}
if len(data) < 4 {
// For non-nil value, tracker must be marshaled
return errors.New("failed to unmarshal value, invalid length")
}
trackersLen := int(binary.BigEndian.Uint32(data[:4]))
data = data[4:]
if trackersLen > 0 {
// Unmarshal trackers
s.trackers = limits.NewTrackers(
uint64(s.resourceLimitCfg.MaxCardinality),
uint64(s.scopeLimitCfg.MaxCardinality),
uint64(s.metricLimitCfg.MaxCardinality),
uint64(s.datapointLimitCfg.MaxCardinality),
)
err := s.trackers.Unmarshal(data[:trackersLen])
if err != nil {
return fmt.Errorf("failed to unmarshal limits: %w", err)
}
data = data[trackersLen:]
}
// Unmarshal pmetric.Metrics
var unmarshaler pmetric.ProtoUnmarshaler
var err error
s.source, err = unmarshaler.UnmarshalMetrics(data)
if err != nil {
return fmt.Errorf("failed to unmarshal data: %w", err)
}
return nil
}
// Merge merges the provided value to the current value instance.
func (v *Value) Merge(op *Value) error {
if op == nil || op.source.DataPointCount() == 0 {
// Nothing to merge
return nil
}
// Initialize the destination lookup table
v.initLookupTables()
// Iterate over the source's pmetric structure and merge into destination
rmsOther := op.source.ResourceMetrics()
for i := 0; i < rmsOther.Len(); i++ {
rmOther := rmsOther.At(i)
resID, rm, err := v.addResourceMetrics(rmOther)
if err != nil {
return fmt.Errorf("failed while merging resource metrics: %w", err)
}
scopeTracker := op.trackers.GetScopeTracker(i)
if scopeTracker != nil {
if err := rm.scopeTracker.MergeEstimators(scopeTracker.Tracker); err != nil {
return fmt.Errorf("failed to merge scope overflow estimators: %w", err)
}
}
smsOther := rmOther.ScopeMetrics()
for j := 0; j < smsOther.Len(); j++ {
smOther := smsOther.At(j)
scopeID, sm, err := v.addScopeMetrics(resID, rm, smOther)
if err != nil {
return fmt.Errorf("failed while merging scope metrics: %w", err)
}
metricTracker := scopeTracker.GetMetricTracker(j)
if metricTracker != nil {
if err := sm.metricTracker.MergeEstimators(metricTracker.Tracker); err != nil {
return fmt.Errorf("failed to merge scope datapoints overflow estimators: %w", err)
}
}
msOther := smOther.Metrics()
for k := 0; k < msOther.Len(); k++ {
mOther := msOther.At(k)
metricID, m, overflow := v.addMetric(scopeID, sm, mOther)
if overflow {
// On metric overflow, we discard any datapoint overflow estimator
// since metric overflow is accounts only for unique metrics.
continue
}
dpTracker := metricTracker.GetDatapointTracker(k)
if dpTracker != nil {
if err := m.datapointTracker.MergeEstimators(dpTracker); err != nil {
return fmt.Errorf("failed to merge datapoint overflow estimators: %w", err)
}
}
if err := v.mergeMetric(metricID, m, mOther); err != nil {
return fmt.Errorf("failed to merge metric: %w", err)
}
}
}
}
if op.trackers != nil {
if err := v.trackers.GetResourceTracker().MergeEstimators(
op.trackers.GetResourceTracker(),
); err != nil {
return fmt.Errorf("failed to merge resource overflow estimators: %w", err)
}
}
return nil
}
// MergeMetric adds a metric with a provided resource metric and scope
// metric. Note that overflows during addition will be applied as per
// the specifications for overflow handling.
func (v *Value) MergeMetric(
otherRm pmetric.ResourceMetrics,
otherSm pmetric.ScopeMetrics,
otherM pmetric.Metric,
) error {
if metricDPsCount(otherM) == 0 {
// Nothing to merge as either there are 0 or only unsupported metrics
return nil
}
v.initLookupTables()
// TODO: Precheck the metric for datapoints existence, if none exists
// then don't add resource/scope metrics. This will help to remove the
// remove if checks from the finalize method.
// OR
// Do this check in the prior call where we know the metric type.
resID, rm, err := v.addResourceMetrics(otherRm)
if err != nil {
return err
}
scopeID, sm, err := v.addScopeMetrics(resID, rm, otherSm)
if err != nil {
return err
}
metricID, m, overflow := v.addMetric(scopeID, sm, otherM)
if overflow {
return nil
}
return v.mergeMetric(metricID, m, otherM)
}
// Finalize finalizes all overflows in the metrics to prepare it for
// harvest. This method must be called only once for harvest.
func (s *Value) Finalize() (pmetric.Metrics, error) {
// At this point we need to assume that the metrics are returned
// as a final step in the store, thus, prepare the final metric.
// In the final metric we have to add datapoint limits. Also, we
// need to ensure that lookup tables, and thus limits, are
// initialized.
s.initLookupTables()
for _, sm := range s.scopeLookup {
if !sm.metricTracker.HasOverflow() {
continue
}
// Add overflow metric due to metric limit breached
if err := fillOverflowMetric(
sm.ScopeMetrics.Metrics().AppendEmpty(),
overflowMetricName,
overflowMetricDesc,
sm.metricTracker.EstimateOverflow(),
s.metricLimitCfg.Overflow.Attributes,
); err != nil {
return pmetric.Metrics{}, fmt.Errorf("failed to finalize merged metric: %w", err)
}
}
for mID, m := range s.metricLookup {
if !m.datapointTracker.HasOverflow() {
continue
}
// Add overflow metric due to datapoint limit breached
sm := s.scopeLookup[mID.Scope()]
if err := fillOverflowMetric(
sm.ScopeMetrics.Metrics().AppendEmpty(),
overflowDatapointMetricName,
overflowDatapointMetricDesc,
m.datapointTracker.EstimateOverflow(),
s.datapointLimitCfg.Overflow.Attributes,
); err != nil {
return pmetric.Metrics{}, fmt.Errorf("failed to finalize merged metric: %w", err)
}
}
return s.source, nil
}
func (s *Value) initLookupTables() {
if s.lookupsInitialized {
return
}
s.lookupsInitialized = true
// If lookup tables are initialized we will need the lookup maps
s.resLookup = make(map[identity.Resource]pdataResourceMetrics)
s.scopeLookup = make(map[identity.Scope]pdataScopeMetrics)
s.metricLookup = make(map[identity.Metric]pdataMetric)
rms := s.source.ResourceMetrics()
if rms.Len() == 0 {
// Nothing to merge
return
}
// Initialize the lookup tables assuming that the limits were respected
// for the marshaled data and unexpected overflow will not happen.
// Initialization is done by directly accessing the map and without
// checking overflows to avoid accounting overflows as normal buckets.
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
rmID := identity.OfResource(rm.Resource())
scopeTracker := s.trackers.GetScopeTracker(i)
s.resLookup[rmID] = pdataResourceMetrics{
ResourceMetrics: rm,
scopeTracker: scopeTracker,
}
sms := rm.ScopeMetrics()
for j := 0; j < sms.Len(); j++ {
sm := sms.At(j)
scope := sm.Scope()
smID := identity.OfScope(rmID, scope)
metricTracker := scopeTracker.GetMetricTracker(j)
s.scopeLookup[smID] = pdataScopeMetrics{
ScopeMetrics: sm,
metricTracker: metricTracker,
}
metrics := sm.Metrics()
for k := 0; k < metrics.Len(); k++ {
m := metrics.At(k)
mID := identity.OfMetric(smID, m)
s.metricLookup[mID] = pdataMetric{
Metric: m,
datapointTracker: metricTracker.GetDatapointTracker(k),
}
//exhaustive:enforce
switch m.Type() {
case pmetric.MetricTypeEmpty:
continue
case pmetric.MetricTypeGauge:
// TODO (lahsivjar): implement gauge support
case pmetric.MetricTypeSum:
if s.numberLookup == nil {
s.numberLookup = make(map[identity.Stream]pmetric.NumberDataPoint)
}
dps := m.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
streamID := identity.OfStream(mID, dp)
s.numberLookup[streamID] = dp
}
case pmetric.MetricTypeSummary:
if s.summaryLookup == nil {
s.summaryLookup = make(map[identity.Stream]pmetric.SummaryDataPoint)
}
dps := m.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
streamID := identity.OfStream(mID, dp)
s.summaryLookup[streamID] = dp
}
case pmetric.MetricTypeHistogram:
if s.histoLookup == nil {
s.histoLookup = make(map[identity.Stream]pmetric.HistogramDataPoint)
}
dps := m.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
streamID := identity.OfStream(mID, dp)
s.histoLookup[streamID] = dp
}
case pmetric.MetricTypeExponentialHistogram:
if s.expHistoLookup == nil {
s.expHistoLookup = make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint)
}
dps := m.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
streamID := identity.OfStream(mID, dp)
s.expHistoLookup[streamID] = dp
}
}
}
}
}
}
// addResourceMetrics adds a new resource metrics to the store while also
// applying resource limiters. If a limit is configured and breached by
// adding the provided resource metric, then, a new overflow resource
// metric is created and returned.
func (s *Value) addResourceMetrics(
otherRm pmetric.ResourceMetrics,
) (identity.Resource, pdataResourceMetrics, error) {
resID := identity.OfResource(otherRm.Resource())
if rm, ok := s.resLookup[resID]; ok {
return resID, rm, nil
}
if s.trackers == nil {
s.trackers = limits.NewTrackers(
uint64(s.resourceLimitCfg.MaxCardinality),
uint64(s.scopeLimitCfg.MaxCardinality),
uint64(s.metricLimitCfg.MaxCardinality),
uint64(s.datapointLimitCfg.MaxCardinality),
)
}
if s.trackers.GetResourceTracker().CheckOverflow(resID.Hash) {
// Overflow, get/prepare an overflow bucket
overflowResID, err := s.getOverflowResourceIdentity()
if err != nil {
return identity.Resource{}, pdataResourceMetrics{}, err
}
if rm, ok := s.resLookup[overflowResID]; ok {
return overflowResID, rm, nil
}
overflowRm := s.source.ResourceMetrics().AppendEmpty()
overflowRm.SetSchemaUrl(otherRm.SchemaUrl())
if err := decorate(
overflowRm.Resource().Attributes(),
s.resourceLimitCfg.Overflow.Attributes,
); err != nil {
return identity.Resource{}, pdataResourceMetrics{}, err
}
rm := pdataResourceMetrics{
ResourceMetrics: overflowRm,
scopeTracker: s.trackers.NewScopeTracker(),
}
s.resLookup[overflowResID] = rm
return overflowResID, rm, nil
}
// Clone it *without* the ScopeMetricsSlice data
rmOrig := s.source.ResourceMetrics().AppendEmpty()
rmOrig.SetSchemaUrl(otherRm.SchemaUrl())
otherRm.Resource().CopyTo(rmOrig.Resource())
rm := pdataResourceMetrics{
ResourceMetrics: rmOrig,
scopeTracker: s.trackers.NewScopeTracker(),
}
s.resLookup[resID] = rm
return resID, rm, nil
}
// addScopeMetrics adds a new scope metrics to the store while also
// applying scope limiters. If a limit is configured and breached by
// adding the provided scope metric, then, a new overflow scope
// metric is created and returned.
func (s *Value) addScopeMetrics(
resID identity.Resource,
rm pdataResourceMetrics,
otherSm pmetric.ScopeMetrics,
) (identity.Scope, pdataScopeMetrics, error) {
scopeID := identity.OfScope(resID, otherSm.Scope())
if sm, ok := s.scopeLookup[scopeID]; ok {
return scopeID, sm, nil
}
if rm.scopeTracker.CheckOverflow(scopeID.Hash) {
// Overflow, get/prepare an overflow bucket
overflowScopeID, err := s.getOverflowScopeIdentity(resID)
if err != nil {
return identity.Scope{}, pdataScopeMetrics{}, err
}
if sm, ok := s.scopeLookup[overflowScopeID]; ok {
return overflowScopeID, sm, nil
}
overflowScope := rm.ScopeMetrics().AppendEmpty()
overflowScope.SetSchemaUrl(otherSm.SchemaUrl())
if err := decorate(
overflowScope.Scope().Attributes(),
s.scopeLimitCfg.Overflow.Attributes,
); err != nil {
return identity.Scope{}, pdataScopeMetrics{}, err
}
sm := pdataScopeMetrics{
ScopeMetrics: overflowScope,
metricTracker: rm.scopeTracker.NewMetricTracker(),
}
s.scopeLookup[overflowScopeID] = sm
return overflowScopeID, sm, nil
}
// Clone it *without* the MetricSlice data
smOrig := rm.ScopeMetrics().AppendEmpty()
otherSm.Scope().CopyTo(smOrig.Scope())
smOrig.SetSchemaUrl(otherSm.SchemaUrl())
sm := pdataScopeMetrics{
ScopeMetrics: smOrig,
metricTracker: rm.scopeTracker.NewMetricTracker(),
}
s.scopeLookup[scopeID] = sm
return scopeID, sm, nil
}
// addMetric adds the given metric to the store while also considering
// datapoint limiters. If a limit is configured and breached by adding a new
// metric then the datapoint overflow is updated and the metric is discarded
// as when datapoint overflows, a new metric overflow sum metric is added
// with delta temporality tracking the cardinality estimate of the overflow.
// The returned bool is `true` if there is overflow and `false` otherwise.
func (s *Value) addMetric(
scopeID identity.Scope,
sm pdataScopeMetrics,
otherM pmetric.Metric,
) (identity.Metric, pdataMetric, bool) {
mID := identity.OfMetric(scopeID, otherM)
if m, ok := s.metricLookup[mID]; ok {
return mID, m, false
}
if sm.metricTracker.CheckOverflow(mID.Hash) {
// Metric overflow detected. In this case no action has to be taken
// at this point since metric overflow should create a new sum metric
// recording the number of unique metric that overflowed. This number
// will be recorded in the limit tracker and the metric will be
// populated on demand.
return identity.Metric{}, pdataMetric{}, true
}
// Clone it *without* the datapoint data
mOrig := sm.Metrics().AppendEmpty()
mOrig.SetName(otherM.Name())
mOrig.SetDescription(otherM.Description())
mOrig.SetUnit(otherM.Unit())
switch otherM.Type() {
case pmetric.MetricTypeGauge:
mOrig.SetEmptyGauge()
case pmetric.MetricTypeSummary:
mOrig.SetEmptySummary()
case pmetric.MetricTypeSum:
otherSum := otherM.Sum()
sum := mOrig.SetEmptySum()
sum.SetAggregationTemporality(otherSum.AggregationTemporality())
sum.SetIsMonotonic(otherSum.IsMonotonic())
case pmetric.MetricTypeHistogram:
otherHist := otherM.Histogram()
hist := mOrig.SetEmptyHistogram()
hist.SetAggregationTemporality(otherHist.AggregationTemporality())
case pmetric.MetricTypeExponentialHistogram:
otherExp := otherM.ExponentialHistogram()
exp := mOrig.SetEmptyExponentialHistogram()
exp.SetAggregationTemporality(otherExp.AggregationTemporality())
}
m := pdataMetric{
Metric: mOrig,
datapointTracker: sm.metricTracker.NewDatapointTracker(),
}
s.metricLookup[mID] = m
return mID, m, false
}
// addSumDataPoint returns a data point entry in the store for the given metric
// and the external data point if it is present. If the data point is not
// present then either a new data point is added or if the data point overflows
// due to configured limit then an empty data point is returned. The returned
// bool value is `true` if the datapoint already exists and `false` otherwise.
func (s *Value) addSumDataPoint(
metricID identity.Metric,
metric pdataMetric,
otherDP pmetric.NumberDataPoint,
) (pmetric.NumberDataPoint, bool) {
streamID := identity.OfStream(metricID, otherDP)
if s.numberLookup == nil {
s.numberLookup = make(map[identity.Stream]pmetric.NumberDataPoint)
} else if dp, ok := s.numberLookup[streamID]; ok {
return dp, true
}
if metric.datapointTracker.CheckOverflow(streamID.Hash) {
// Datapoints overflow detected. In this case no action has to be
// done at this point since data point overflow should create a new
// overflow metric of sum type recording the number of unique
// datapoints. This number will be recorded in the limit tracker
// and the metric will be populated on demand.
return pmetric.NumberDataPoint{}, false
}
dp := metric.Sum().DataPoints().AppendEmpty()
// New datapoint created, so copy the otherDP to the new one
otherDP.CopyTo(dp)
s.numberLookup[streamID] = dp
return dp, false
}
// addSummaryDataPoint returns a data point entry in the store for the given
// metric and the external data point if it is present. If the data point is
// not present then either a new data point is added or if the data point
// overflows due to configured limit then an empty data point is returned.
// The returned bool value is `true` if datapoint already exists and `false`
// otherwise.
func (s *Value) addSummaryDataPoint(
metricID identity.Metric,
metric pdataMetric,
otherDP pmetric.SummaryDataPoint,
) (pmetric.SummaryDataPoint, bool) {
streamID := identity.OfStream(metricID, otherDP)
if s.summaryLookup == nil {
s.summaryLookup = make(map[identity.Stream]pmetric.SummaryDataPoint)
} else if dp, ok := s.summaryLookup[streamID]; ok {
return dp, true
}
if metric.datapointTracker.CheckOverflow(streamID.Hash) {
// Datapoints overflow detected. In this case no action has to be
// done at this point since data point overflow should create a new
// overflow metric of sum type recording the number of unique
// datapoints. This number will be recorded in the limit tracker
// and the metric will be populated on demand.
return pmetric.SummaryDataPoint{}, false
}
dp := metric.Summary().DataPoints().AppendEmpty()
// New datapoint created, so copy the otherDP to the new one
otherDP.CopyTo(dp)
s.summaryLookup[streamID] = dp
return dp, false
}
// addHistogramDataPoint returns a data point entry in the store for the given
// metric and the external data point if it is present. If the data point is
// not present then either a new data point is added or if the data point
// overflows due to configured limit then an empty data point is returned.
// The returned bool value is `true` if datapoint already exists and `false`
// otherwise.
func (s *Value) addHistogramDataPoint(
metricID identity.Metric,
metric pdataMetric,
otherDP pmetric.HistogramDataPoint,
) (pmetric.HistogramDataPoint, bool) {
streamID := identity.OfStream(metricID, otherDP)
if s.histoLookup == nil {
s.histoLookup = make(map[identity.Stream]pmetric.HistogramDataPoint)
} else if dp, ok := s.histoLookup[streamID]; ok {
return dp, true
}
if metric.datapointTracker.CheckOverflow(streamID.Hash) {
// Datapoints overflow detected. In this case no action has to be
// done at this point since data point overflow should create a new
// overflow metric of sum type recording the number of unique
// datapoints. This number will be recorded in the limit tracker
// and the metric will be populated on demand.
return pmetric.HistogramDataPoint{}, false
}
dp := metric.Histogram().DataPoints().AppendEmpty()
// New datapoint created, so copy the otherDP to the new one
otherDP.CopyTo(dp)
s.histoLookup[streamID] = dp
return dp, false
}
// addExponentialHistogramDataPoint returns a data point entry in the store
// for the given metric and the external data point if it is present. If the
// data point is not present then either a new data point is added or if the
// data point overflows due to configured limit then an empty data point is
// returned. The returned bool value is `true` if datapoint already exists
// and `false` otherwise.
func (s *Value) addExponentialHistogramDataPoint(
metricID identity.Metric,
metric pdataMetric,
otherDP pmetric.ExponentialHistogramDataPoint,
) (pmetric.ExponentialHistogramDataPoint, bool) {
streamID := identity.OfStream(metricID, otherDP)
if s.expHistoLookup == nil {
s.expHistoLookup = make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint)
} else if dp, ok := s.expHistoLookup[streamID]; ok {
return dp, true
}
if metric.datapointTracker.CheckOverflow(streamID.Hash) {
// Datapoints overflow detected. In this case no action has to be
// done at this point since data point overflow should create a new
// overflow metric of sum type recording the number of unique
// datapoints. This number will be recorded in the limit tracker
// and the metric will be populated on demand.
return pmetric.ExponentialHistogramDataPoint{}, false
}
dp := metric.ExponentialHistogram().DataPoints().AppendEmpty()
// New datapoint created, so copy the otherDP to the new one
otherDP.CopyTo(dp)
s.expHistoLookup[streamID] = dp
return dp, false
}
func (v *Value) mergeMetric(
metricID identity.Metric,
m pdataMetric,
otherM pmetric.Metric,
) error {
switch typ := otherM.Type(); typ {
case pmetric.MetricTypeSum:
return mergeDataPoints(
otherM.Sum().DataPoints(),
metricID,
m,
v.addSumDataPoint,
otherM.Sum().AggregationTemporality(),
v.maxExponentialHistogramBuckets,
)
case pmetric.MetricTypeSummary:
return mergeDataPoints(
otherM.Summary().DataPoints(),
metricID,
m,
v.addSummaryDataPoint,
// Assume summary to be cumulative temporality
pmetric.AggregationTemporalityCumulative,
v.maxExponentialHistogramBuckets,
)
case pmetric.MetricTypeHistogram:
return mergeDataPoints(
otherM.Histogram().DataPoints(),
metricID,
m,
v.addHistogramDataPoint,
otherM.Histogram().AggregationTemporality(),
v.maxExponentialHistogramBuckets,
)
case pmetric.MetricTypeExponentialHistogram:
return mergeDataPoints(
otherM.ExponentialHistogram().DataPoints(),
metricID,
m,
v.addExponentialHistogramDataPoint,
otherM.ExponentialHistogram().AggregationTemporality(),
v.maxExponentialHistogramBuckets,
)
default:
return fmt.Errorf("unsupported metric type: %s", typ)
}
}
func (s *Value) getOverflowResourceIdentity() (identity.Resource, error) {
r := pcommon.NewResource()
if err := decorate(
r.Attributes(),
s.resourceLimitCfg.Overflow.Attributes,
); err != nil {
return identity.Resource{}, fmt.Errorf("failed to create overflow bucket: %w", err)
}
return identity.OfResource(r), nil
}
func (s *Value) getOverflowScopeIdentity(
res identity.Resource,
) (identity.Scope, error) {
scope := pcommon.NewInstrumentationScope()
if err := decorate(
scope.Attributes(),
s.scopeLimitCfg.Overflow.Attributes,
); err != nil {
return identity.Scope{}, fmt.Errorf("failed to create overflow bucket: %w", err)
}
return identity.OfScope(res, scope), nil
}
func fillOverflowMetric(
m pmetric.Metric,
name, desc string,
count uint64,
extraAttrs []config.Attribute,
) error {
m.SetName(name)
m.SetDescription(desc)
sum := m.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetIntValue(int64(count))
if err := decorate(dp.Attributes(), extraAttrs); err != nil {
return fmt.Errorf("failed to append configured attributes to overflow metric: %w", err)
}
return nil
}
func decorate(target pcommon.Map, src []config.Attribute) error {
if len(src) == 0 {
return nil
}
var errs []error
target.EnsureCapacity(len(src))
for _, attr := range src {
v := target.PutEmpty(attr.Key)
if err := v.FromRaw(attr.Value); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf(
"failed to prepare resource overflow bucket: %w",
errors.Join(errs...),
)
}
return nil
}
func metricDPsCount(m pmetric.Metric) uint64 {
switch m.Type() {
case pmetric.MetricTypeSum:
return uint64(m.Sum().DataPoints().Len())
case pmetric.MetricTypeSummary:
return uint64(m.Summary().DataPoints().Len())
case pmetric.MetricTypeHistogram:
return uint64(m.Histogram().DataPoints().Len())
case pmetric.MetricTypeExponentialHistogram:
return uint64(m.ExponentialHistogram().DataPoints().Len())
default:
// Includes non supported metric types
return 0
}
}