processor/lsmintervalprocessor/internal/data/add.go (105 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// This is a copy of the internal module from opentelemetry-collector-contrib:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor/internal/data
package data // import "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/data"
import (
"math"
"go.opentelemetry.io/collector/pdata/pmetric"
"github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/data/expo"
"github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/putil/pslice"
)
// Aggregator performs an operation on two datapoints.
// Given [pmetric] types are mutable by nature, this logically works as follows:
//
// *state = op(state, dp)
//
// See [Adder] for an implementation.
type Aggregator interface {
Numbers(state, dp pmetric.NumberDataPoint) error
Histograms(state, dp pmetric.HistogramDataPoint) error
Exponential(state, dp pmetric.ExponentialHistogramDataPoint) error
}
var _ Aggregator = (*Adder)(nil)
// Adder adds (+) datapoints.
type Adder struct {
maxExponentialHistogramBuckets int
}
func NewAdder(maxBuckets int) Adder {
return Adder{maxExponentialHistogramBuckets: maxBuckets}
}
func (add Adder) Numbers(state, dp pmetric.NumberDataPoint) error {
switch dp.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
v := state.DoubleValue() + dp.DoubleValue()
state.SetDoubleValue(v)
case pmetric.NumberDataPointValueTypeInt:
v := state.IntValue() + dp.IntValue()
state.SetIntValue(v)
}
return nil
}
func (add Adder) Histograms(state, dp pmetric.HistogramDataPoint) error {
// bounds different: no way to merge, so reset observation to new boundaries
if !pslice.Equal(state.ExplicitBounds(), dp.ExplicitBounds()) {
dp.CopyTo(state)
return nil
}
// spec requires len(BucketCounts) == len(ExplicitBounds)+1.
// given we have limited error handling at this stage (and already verified boundaries are correct),
// doing a best-effort add of whatever we have appears reasonable.
n := min(state.BucketCounts().Len(), dp.BucketCounts().Len())
for i := 0; i < n; i++ {
sum := state.BucketCounts().At(i) + dp.BucketCounts().At(i)
state.BucketCounts().SetAt(i, sum)
}
state.SetCount(state.Count() + dp.Count())
if state.HasSum() && dp.HasSum() {
state.SetSum(state.Sum() + dp.Sum())
} else {
state.RemoveSum()
}
if state.HasMin() && dp.HasMin() {
state.SetMin(math.Min(state.Min(), dp.Min()))
} else {
state.RemoveMin()
}
if state.HasMax() && dp.HasMax() {
state.SetMax(math.Max(state.Max(), dp.Max()))
} else {
state.RemoveMax()
}
return nil
}
func (add Adder) Exponential(state, dp pmetric.ExponentialHistogramDataPoint) error {
type H = pmetric.ExponentialHistogramDataPoint
if state.Scale() != dp.Scale() {
hi, lo := expo.HiLo(state, dp, H.Scale)
from, to := expo.Scale(hi.Scale()), expo.Scale(lo.Scale())
expo.Downscale(hi.Positive(), from, to)
expo.Downscale(hi.Negative(), from, to)
hi.SetScale(lo.Scale())
}
// Downscale if an expected number of buckets after the merge is too large.
from := expo.Scale(state.Scale())
to := min(
expo.Limit(add.maxExponentialHistogramBuckets, from, state.Positive(), dp.Positive()),
expo.Limit(add.maxExponentialHistogramBuckets, from, state.Negative(), dp.Negative()),
)
if from != to {
expo.Downscale(state.Positive(), from, to)
expo.Downscale(state.Negative(), from, to)
expo.Downscale(dp.Positive(), from, to)
expo.Downscale(dp.Negative(), from, to)
state.SetScale(int32(to))
dp.SetScale(int32(to))
}
if state.ZeroThreshold() != dp.ZeroThreshold() {
hi, lo := expo.HiLo(state, dp, H.ZeroThreshold)
expo.WidenZero(lo, hi.ZeroThreshold())
}
expo.Merge(state.Positive(), dp.Positive())
expo.Merge(state.Negative(), dp.Negative())
state.SetCount(state.Count() + dp.Count())
state.SetZeroCount(state.ZeroCount() + dp.ZeroCount())
if state.HasSum() && dp.HasSum() {
state.SetSum(state.Sum() + dp.Sum())
} else {
state.RemoveSum()
}
if state.HasMin() && dp.HasMin() {
state.SetMin(math.Min(state.Min(), dp.Min()))
} else {
state.RemoveMin()
}
if state.HasMax() && dp.HasMax() {
state.SetMax(math.Max(state.Max(), dp.Max()))
} else {
state.RemoveMax()
}
return nil
}