func()

in aggregators/aggregator.go [176:234]


func (a *Aggregator) AggregateCombinedMetrics(
	ctx context.Context,
	cmk CombinedMetricsKey,
	cm *aggregationpb.CombinedMetrics,
) error {
	cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(cmk.ID)
	traceAttrs := append(cmIDAttrs,
		attribute.String(aggregationIvlKey, formatDuration(cmk.Interval)),
		attribute.String("processing_time", cmk.ProcessingTime.String()),
	)
	ctx, span := a.cfg.Tracer.Start(ctx, "AggregateCombinedMetrics", trace.WithAttributes(traceAttrs...))
	defer span.End()

	a.mu.Lock()
	defer a.mu.Unlock()

	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-a.closed:
		return ErrAggregatorClosed
	default:
	}

	if cmk.ProcessingTime.Before(a.processingTime.Add(-a.cfg.Lookback)) {
		a.metrics.EventsProcessed.Add(
			context.Background(), cm.EventsTotal,
			metric.WithAttributeSet(attribute.NewSet(
				append(a.cfg.CombinedMetricsIDToKVs(cmk.ID),
					attribute.String(aggregationIvlKey, formatDuration(cmk.Interval)),
					telemetry.WithFailure(),
				)...,
			)),
		)
		a.cfg.Logger.Warn(
			"received expired combined metrics, dropping silently",
			zap.Time("received_processing_time", cmk.ProcessingTime),
			zap.Time("current_processing_time", a.processingTime),
		)
		return nil
	}

	var attrSetOpt metric.MeasurementOption
	bytesIn, err := a.aggregate(ctx, cmk, cm)
	if err != nil {
		attrSetOpt = metric.WithAttributeSet(
			attribute.NewSet(append(cmIDAttrs, telemetry.WithFailure())...),
		)
	} else {
		attrSetOpt = metric.WithAttributeSet(
			attribute.NewSet(append(cmIDAttrs, telemetry.WithSuccess())...),
		)
	}

	span.SetAttributes(attribute.Int("bytes_ingested", bytesIn))
	a.cachedEvents.add(cmk.Interval, cmk.ID, cm.EventsTotal)
	a.metrics.BytesProcessed.Add(context.Background(), int64(bytesIn), attrSetOpt)
	return err
}