func()

in aggregators/aggregator.go [121:170]


func (a *Aggregator) AggregateBatch(
	ctx context.Context,
	id [16]byte,
	b *modelpb.Batch,
) error {
	cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(id)

	a.mu.Lock()
	defer a.mu.Unlock()

	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-a.closed:
		return ErrAggregatorClosed
	default:
	}

	var (
		errs                    []error
		successBytes, failBytes int64
	)
	cmk := CombinedMetricsKey{ID: id}
	for _, ivl := range a.cfg.AggregationIntervals {
		cmk.ProcessingTime = a.processingTime.Truncate(ivl)
		cmk.Interval = ivl
		for _, e := range *b {
			bytesIn, err := a.aggregateAPMEvent(ctx, cmk, e)
			if err != nil {
				errs = append(errs, err)
				failBytes += int64(bytesIn)
			} else {
				successBytes += int64(bytesIn)
			}
		}
		a.cachedEvents.add(ivl, id, float64(len(*b)))
	}

	var err error
	if len(errs) > 0 {
		a.metrics.BytesProcessed.Add(context.Background(), failBytes, metric.WithAttributeSet(
			attribute.NewSet(append(cmIDAttrs, telemetry.WithFailure())...),
		))
		err = fmt.Errorf("failed batch aggregation:\n%w", errors.Join(errs...))
	}
	a.metrics.BytesProcessed.Add(context.Background(), successBytes, metric.WithAttributeSet(
		attribute.NewSet(append(cmIDAttrs, telemetry.WithSuccess())...),
	))
	return err
}