in cmd/otelinmemexporter/store.go [307:355]
func (s *Store) mergeNumberDataPoints(
name string,
from pmetric.NumberDataPointSlice,
resAttrs pcommon.Map,
) {
if s.nums == nil {
s.nums = make(map[string]map[string]pmetric.NumberDataPoint)
}
for i := 0; i < from.Len(); i++ {
dp := from.At(i)
attrs := dp.Attributes()
for _, cfg := range s.filterCfgs(name, attrs, resAttrs) {
to := getMergeTo(s.nums, pmetric.NewNumberDataPoint, cfg, attrs, resAttrs)
switch cfg.Type {
case Last:
to.SetDoubleValue(doubleValue(dp))
case Sum:
to.SetDoubleValue(to.DoubleValue() + doubleValue(dp))
case Rate:
val := doubleValue(dp)
if val != 0 {
to.SetDoubleValue(to.DoubleValue() + val)
// We will use to#StartTimestamp and to#Timestamp fields to
// cache the lowest and the highest timestamps. This will be
// used at query time to calculate rate.
if to.StartTimestamp() == 0 {
// If the data point has a start timestamp then use that
// as the start timestamp, else use the end timestamp.
if dp.StartTimestamp() != 0 {
to.SetStartTimestamp(dp.StartTimestamp())
} else {
to.SetStartTimestamp(dp.Timestamp())
}
}
if to.Timestamp() < dp.Timestamp() {
to.SetTimestamp(dp.Timestamp())
}
}
default:
s.logger.Warn(
"aggregation type not available for number data points",
zap.String("name", name),
zap.String("agg_type", string(cfg.Type)),
)
}
}
}
}