func EventToCombinedMetrics()

in aggregators/converter.go [314:368]


func EventToCombinedMetrics(
	e *modelpb.APMEvent,
	unpartitionedKey CombinedMetricsKey,
	partitions uint16,
	callback func(CombinedMetricsKey, *aggregationpb.CombinedMetrics) error,
) error {
	globalLabels, err := marshalEventGlobalLabels(e)
	if err != nil {
		return fmt.Errorf("failed to marshal global labels: %w", err)
	}

	pmb := getPartitionedMetricsBuilder(
		aggregationpb.ServiceAggregationKey{
			Timestamp: modelpb.FromTime(
				modelpb.ToTime(e.GetTimestamp()).Truncate(unpartitionedKey.Interval),
			),
			ServiceName:         e.GetService().GetName(),
			ServiceEnvironment:  e.GetService().GetEnvironment(),
			ServiceLanguageName: e.GetService().GetLanguage().GetName(),
			AgentName:           e.GetAgent().GetName(),
			GlobalLabelsStr:     globalLabels,
		},
		partitions,
	)
	defer pmb.release()

	pmb.processEvent(e)
	if len(pmb.builders) == 0 {
		// This is unexpected state as any APMEvent must result in atleast the
		// service summary metric. If such a state happens then it would indicate
		// a bug in `processEvent`.
		return fmt.Errorf("service summary metric must be produced for any event")
	}

	// Approximate events total by uniformly distributing the events total
	// amongst the partitioned key values.
	pmb.combinedMetrics.EventsTotal = 1 / float64(len(pmb.builders))
	pmb.combinedMetrics.YoungestEventTimestamp = e.GetEvent().GetReceived()

	var errs []error
	for _, mb := range pmb.builders {
		key := unpartitionedKey
		key.PartitionID = mb.partition
		pmb.serviceMetrics.TransactionMetrics = mb.keyedTransactionMetricsSlice
		pmb.serviceMetrics.ServiceTransactionMetrics = mb.keyedServiceTransactionMetricsSlice
		pmb.serviceMetrics.SpanMetrics = mb.keyedSpanMetricsSlice
		if err := callback(key, &pmb.combinedMetrics); err != nil {
			errs = append(errs, err)
		}
	}
	if len(errs) > 0 {
		return fmt.Errorf("failed while executing callback: %w", errors.Join(errs...))
	}
	return nil
}