in aggregators/converter.go [314:368]
func EventToCombinedMetrics(
e *modelpb.APMEvent,
unpartitionedKey CombinedMetricsKey,
partitions uint16,
callback func(CombinedMetricsKey, *aggregationpb.CombinedMetrics) error,
) error {
globalLabels, err := marshalEventGlobalLabels(e)
if err != nil {
return fmt.Errorf("failed to marshal global labels: %w", err)
}
pmb := getPartitionedMetricsBuilder(
aggregationpb.ServiceAggregationKey{
Timestamp: modelpb.FromTime(
modelpb.ToTime(e.GetTimestamp()).Truncate(unpartitionedKey.Interval),
),
ServiceName: e.GetService().GetName(),
ServiceEnvironment: e.GetService().GetEnvironment(),
ServiceLanguageName: e.GetService().GetLanguage().GetName(),
AgentName: e.GetAgent().GetName(),
GlobalLabelsStr: globalLabels,
},
partitions,
)
defer pmb.release()
pmb.processEvent(e)
if len(pmb.builders) == 0 {
// This is unexpected state as any APMEvent must result in atleast the
// service summary metric. If such a state happens then it would indicate
// a bug in `processEvent`.
return fmt.Errorf("service summary metric must be produced for any event")
}
// Approximate events total by uniformly distributing the events total
// amongst the partitioned key values.
pmb.combinedMetrics.EventsTotal = 1 / float64(len(pmb.builders))
pmb.combinedMetrics.YoungestEventTimestamp = e.GetEvent().GetReceived()
var errs []error
for _, mb := range pmb.builders {
key := unpartitionedKey
key.PartitionID = mb.partition
pmb.serviceMetrics.TransactionMetrics = mb.keyedTransactionMetricsSlice
pmb.serviceMetrics.ServiceTransactionMetrics = mb.keyedServiceTransactionMetricsSlice
pmb.serviceMetrics.SpanMetrics = mb.keyedSpanMetricsSlice
if err := callback(key, &pmb.combinedMetrics); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf("failed while executing callback: %w", errors.Join(errs...))
}
return nil
}