in aggregators/aggregator.go [176:234]
func (a *Aggregator) AggregateCombinedMetrics(
ctx context.Context,
cmk CombinedMetricsKey,
cm *aggregationpb.CombinedMetrics,
) error {
cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(cmk.ID)
traceAttrs := append(cmIDAttrs,
attribute.String(aggregationIvlKey, formatDuration(cmk.Interval)),
attribute.String("processing_time", cmk.ProcessingTime.String()),
)
ctx, span := a.cfg.Tracer.Start(ctx, "AggregateCombinedMetrics", trace.WithAttributes(traceAttrs...))
defer span.End()
a.mu.Lock()
defer a.mu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-a.closed:
return ErrAggregatorClosed
default:
}
if cmk.ProcessingTime.Before(a.processingTime.Add(-a.cfg.Lookback)) {
a.metrics.EventsProcessed.Add(
context.Background(), cm.EventsTotal,
metric.WithAttributeSet(attribute.NewSet(
append(a.cfg.CombinedMetricsIDToKVs(cmk.ID),
attribute.String(aggregationIvlKey, formatDuration(cmk.Interval)),
telemetry.WithFailure(),
)...,
)),
)
a.cfg.Logger.Warn(
"received expired combined metrics, dropping silently",
zap.Time("received_processing_time", cmk.ProcessingTime),
zap.Time("current_processing_time", a.processingTime),
)
return nil
}
var attrSetOpt metric.MeasurementOption
bytesIn, err := a.aggregate(ctx, cmk, cm)
if err != nil {
attrSetOpt = metric.WithAttributeSet(
attribute.NewSet(append(cmIDAttrs, telemetry.WithFailure())...),
)
} else {
attrSetOpt = metric.WithAttributeSet(
attribute.NewSet(append(cmIDAttrs, telemetry.WithSuccess())...),
)
}
span.SetAttributes(attribute.Int("bytes_ingested", bytesIn))
a.cachedEvents.add(cmk.Interval, cmk.ID, cm.EventsTotal)
a.metrics.BytesProcessed.Add(context.Background(), int64(bytesIn), attrSetOpt)
return err
}