func CombinedMetricsToBatch()

in aggregators/converter.go [371:550]


func CombinedMetricsToBatch(
	cm *aggregationpb.CombinedMetrics,
	processingTime time.Time,
	aggInterval time.Duration,
) (*modelpb.Batch, error) {
	if cm == nil || len(cm.ServiceMetrics) == 0 {
		return nil, nil
	}

	var batchSize int

	// service_summary overflow metric
	if len(cm.OverflowServicesEstimator) > 0 {
		batchSize++
		if len(cm.OverflowServices.OverflowTransactionsEstimator) > 0 {
			batchSize++
		}
		if len(cm.OverflowServices.OverflowServiceTransactionsEstimator) > 0 {
			batchSize++
		}
		if len(cm.OverflowServices.OverflowSpansEstimator) > 0 {
			batchSize++
		}
	}

	for _, ksm := range cm.ServiceMetrics {
		sm := ksm.Metrics
		batchSize += len(sm.TransactionMetrics)
		batchSize += len(sm.ServiceTransactionMetrics)
		batchSize += len(sm.SpanMetrics)
		batchSize++ // Each service will create a service summary metric
		if sm.OverflowGroups == nil {
			continue
		}
		if len(sm.OverflowGroups.OverflowTransactionsEstimator) > 0 {
			batchSize++
		}
		if len(sm.OverflowGroups.OverflowServiceTransactionsEstimator) > 0 {
			batchSize++
		}
		if len(sm.OverflowGroups.OverflowSpansEstimator) > 0 {
			batchSize++
		}
	}

	b := make(modelpb.Batch, 0, batchSize)
	aggIntervalStr := formatDuration(aggInterval)
	now := time.Now()
	for _, ksm := range cm.ServiceMetrics {
		sk, sm := ksm.Key, ksm.Metrics

		var gl globalLabels
		if err := gl.UnmarshalBinary(sk.GlobalLabelsStr); err != nil {
			return nil, fmt.Errorf("failed to unmarshal global labels: %w", err)
		}
		getBaseEventWithLabels := func() *modelpb.APMEvent {
			event := getBaseEvent(sk, now)
			event.Labels = gl.Labels
			event.NumericLabels = gl.NumericLabels
			return event
		}

		// transaction metrics
		for _, ktm := range sm.TransactionMetrics {
			event := getBaseEventWithLabels()
			txnMetricsToAPMEvent(ktm.Key, ktm.Metrics, event, aggIntervalStr)
			b = append(b, event)
		}
		// service transaction metrics
		for _, kstm := range sm.ServiceTransactionMetrics {
			event := getBaseEventWithLabels()
			svcTxnMetricsToAPMEvent(kstm.Key, kstm.Metrics, event, aggIntervalStr)
			b = append(b, event)
		}
		// service destination metrics
		for _, kspm := range sm.SpanMetrics {
			event := getBaseEventWithLabels()
			spanMetricsToAPMEvent(kspm.Key, kspm.Metrics, event, aggIntervalStr)
			b = append(b, event)
		}

		// service summary metrics
		event := getBaseEventWithLabels()
		serviceMetricsToAPMEvent(event, aggIntervalStr)
		b = append(b, event)

		if sm.OverflowGroups == nil {
			continue
		}
		if len(sm.OverflowGroups.OverflowTransactionsEstimator) > 0 {
			estimator := hllSketch(sm.OverflowGroups.OverflowTransactionsEstimator)
			event := getBaseEvent(sk, now)
			overflowTxnMetricsToAPMEvent(
				processingTime,
				sm.OverflowGroups.OverflowTransactions,
				estimator.Estimate(),
				event,
				aggIntervalStr,
			)
			b = append(b, event)
		}
		if len(sm.OverflowGroups.OverflowServiceTransactionsEstimator) > 0 {
			estimator := hllSketch(
				sm.OverflowGroups.OverflowServiceTransactionsEstimator,
			)
			event := getBaseEvent(sk, now)
			overflowSvcTxnMetricsToAPMEvent(
				processingTime,
				sm.OverflowGroups.OverflowServiceTransactions,
				estimator.Estimate(),
				event,
				aggIntervalStr,
			)
			b = append(b, event)
		}
		if len(sm.OverflowGroups.OverflowSpansEstimator) > 0 {
			estimator := hllSketch(sm.OverflowGroups.OverflowSpansEstimator)
			event := getBaseEvent(sk, now)
			overflowSpanMetricsToAPMEvent(
				processingTime,
				sm.OverflowGroups.OverflowSpans,
				estimator.Estimate(),
				event,
				aggIntervalStr,
			)
			b = append(b, event)
		}
	}
	if len(cm.OverflowServicesEstimator) > 0 {
		estimator := hllSketch(cm.OverflowServicesEstimator)
		event := getOverflowBaseEvent(cm.YoungestEventTimestamp)
		overflowServiceMetricsToAPMEvent(
			processingTime,
			estimator.Estimate(),
			event,
			aggIntervalStr,
		)
		b = append(b, event)
		if len(cm.OverflowServices.OverflowTransactionsEstimator) > 0 {
			estimator := hllSketch(cm.OverflowServices.OverflowTransactionsEstimator)
			event := getOverflowBaseEvent(cm.YoungestEventTimestamp)
			overflowTxnMetricsToAPMEvent(
				processingTime,
				cm.OverflowServices.OverflowTransactions,
				estimator.Estimate(),
				event,
				aggIntervalStr,
			)
			b = append(b, event)

		}
		if len(cm.OverflowServices.OverflowServiceTransactionsEstimator) > 0 {
			estimator := hllSketch(
				cm.OverflowServices.OverflowServiceTransactionsEstimator,
			)
			event := getOverflowBaseEvent(cm.YoungestEventTimestamp)
			overflowSvcTxnMetricsToAPMEvent(
				processingTime,
				cm.OverflowServices.OverflowServiceTransactions,
				estimator.Estimate(),
				event,
				aggIntervalStr,
			)
			b = append(b, event)
		}
		if len(cm.OverflowServices.OverflowSpansEstimator) > 0 {
			estimator := hllSketch(cm.OverflowServices.OverflowSpansEstimator)
			event := getOverflowBaseEvent(cm.YoungestEventTimestamp)
			overflowSpanMetricsToAPMEvent(
				processingTime,
				cm.OverflowServices.OverflowSpans,
				estimator.Estimate(),
				event,
				aggIntervalStr,
			)
			b = append(b, event)
		}
	}
	return &b, nil
}