in aggregators/aggregator.go [121:170]
func (a *Aggregator) AggregateBatch(
ctx context.Context,
id [16]byte,
b *modelpb.Batch,
) error {
cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(id)
a.mu.Lock()
defer a.mu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-a.closed:
return ErrAggregatorClosed
default:
}
var (
errs []error
successBytes, failBytes int64
)
cmk := CombinedMetricsKey{ID: id}
for _, ivl := range a.cfg.AggregationIntervals {
cmk.ProcessingTime = a.processingTime.Truncate(ivl)
cmk.Interval = ivl
for _, e := range *b {
bytesIn, err := a.aggregateAPMEvent(ctx, cmk, e)
if err != nil {
errs = append(errs, err)
failBytes += int64(bytesIn)
} else {
successBytes += int64(bytesIn)
}
}
a.cachedEvents.add(ivl, id, float64(len(*b)))
}
var err error
if len(errs) > 0 {
a.metrics.BytesProcessed.Add(context.Background(), failBytes, metric.WithAttributeSet(
attribute.NewSet(append(cmIDAttrs, telemetry.WithFailure())...),
))
err = fmt.Errorf("failed batch aggregation:\n%w", errors.Join(errs...))
}
a.metrics.BytesProcessed.Add(context.Background(), successBytes, metric.WithAttributeSet(
attribute.NewSet(append(cmIDAttrs, telemetry.WithSuccess())...),
))
return err
}