in aggregators/converter.go [371:550]
func CombinedMetricsToBatch(
cm *aggregationpb.CombinedMetrics,
processingTime time.Time,
aggInterval time.Duration,
) (*modelpb.Batch, error) {
if cm == nil || len(cm.ServiceMetrics) == 0 {
return nil, nil
}
var batchSize int
// service_summary overflow metric
if len(cm.OverflowServicesEstimator) > 0 {
batchSize++
if len(cm.OverflowServices.OverflowTransactionsEstimator) > 0 {
batchSize++
}
if len(cm.OverflowServices.OverflowServiceTransactionsEstimator) > 0 {
batchSize++
}
if len(cm.OverflowServices.OverflowSpansEstimator) > 0 {
batchSize++
}
}
for _, ksm := range cm.ServiceMetrics {
sm := ksm.Metrics
batchSize += len(sm.TransactionMetrics)
batchSize += len(sm.ServiceTransactionMetrics)
batchSize += len(sm.SpanMetrics)
batchSize++ // Each service will create a service summary metric
if sm.OverflowGroups == nil {
continue
}
if len(sm.OverflowGroups.OverflowTransactionsEstimator) > 0 {
batchSize++
}
if len(sm.OverflowGroups.OverflowServiceTransactionsEstimator) > 0 {
batchSize++
}
if len(sm.OverflowGroups.OverflowSpansEstimator) > 0 {
batchSize++
}
}
b := make(modelpb.Batch, 0, batchSize)
aggIntervalStr := formatDuration(aggInterval)
now := time.Now()
for _, ksm := range cm.ServiceMetrics {
sk, sm := ksm.Key, ksm.Metrics
var gl globalLabels
if err := gl.UnmarshalBinary(sk.GlobalLabelsStr); err != nil {
return nil, fmt.Errorf("failed to unmarshal global labels: %w", err)
}
getBaseEventWithLabels := func() *modelpb.APMEvent {
event := getBaseEvent(sk, now)
event.Labels = gl.Labels
event.NumericLabels = gl.NumericLabels
return event
}
// transaction metrics
for _, ktm := range sm.TransactionMetrics {
event := getBaseEventWithLabels()
txnMetricsToAPMEvent(ktm.Key, ktm.Metrics, event, aggIntervalStr)
b = append(b, event)
}
// service transaction metrics
for _, kstm := range sm.ServiceTransactionMetrics {
event := getBaseEventWithLabels()
svcTxnMetricsToAPMEvent(kstm.Key, kstm.Metrics, event, aggIntervalStr)
b = append(b, event)
}
// service destination metrics
for _, kspm := range sm.SpanMetrics {
event := getBaseEventWithLabels()
spanMetricsToAPMEvent(kspm.Key, kspm.Metrics, event, aggIntervalStr)
b = append(b, event)
}
// service summary metrics
event := getBaseEventWithLabels()
serviceMetricsToAPMEvent(event, aggIntervalStr)
b = append(b, event)
if sm.OverflowGroups == nil {
continue
}
if len(sm.OverflowGroups.OverflowTransactionsEstimator) > 0 {
estimator := hllSketch(sm.OverflowGroups.OverflowTransactionsEstimator)
event := getBaseEvent(sk, now)
overflowTxnMetricsToAPMEvent(
processingTime,
sm.OverflowGroups.OverflowTransactions,
estimator.Estimate(),
event,
aggIntervalStr,
)
b = append(b, event)
}
if len(sm.OverflowGroups.OverflowServiceTransactionsEstimator) > 0 {
estimator := hllSketch(
sm.OverflowGroups.OverflowServiceTransactionsEstimator,
)
event := getBaseEvent(sk, now)
overflowSvcTxnMetricsToAPMEvent(
processingTime,
sm.OverflowGroups.OverflowServiceTransactions,
estimator.Estimate(),
event,
aggIntervalStr,
)
b = append(b, event)
}
if len(sm.OverflowGroups.OverflowSpansEstimator) > 0 {
estimator := hllSketch(sm.OverflowGroups.OverflowSpansEstimator)
event := getBaseEvent(sk, now)
overflowSpanMetricsToAPMEvent(
processingTime,
sm.OverflowGroups.OverflowSpans,
estimator.Estimate(),
event,
aggIntervalStr,
)
b = append(b, event)
}
}
if len(cm.OverflowServicesEstimator) > 0 {
estimator := hllSketch(cm.OverflowServicesEstimator)
event := getOverflowBaseEvent(cm.YoungestEventTimestamp)
overflowServiceMetricsToAPMEvent(
processingTime,
estimator.Estimate(),
event,
aggIntervalStr,
)
b = append(b, event)
if len(cm.OverflowServices.OverflowTransactionsEstimator) > 0 {
estimator := hllSketch(cm.OverflowServices.OverflowTransactionsEstimator)
event := getOverflowBaseEvent(cm.YoungestEventTimestamp)
overflowTxnMetricsToAPMEvent(
processingTime,
cm.OverflowServices.OverflowTransactions,
estimator.Estimate(),
event,
aggIntervalStr,
)
b = append(b, event)
}
if len(cm.OverflowServices.OverflowServiceTransactionsEstimator) > 0 {
estimator := hllSketch(
cm.OverflowServices.OverflowServiceTransactionsEstimator,
)
event := getOverflowBaseEvent(cm.YoungestEventTimestamp)
overflowSvcTxnMetricsToAPMEvent(
processingTime,
cm.OverflowServices.OverflowServiceTransactions,
estimator.Estimate(),
event,
aggIntervalStr,
)
b = append(b, event)
}
if len(cm.OverflowServices.OverflowSpansEstimator) > 0 {
estimator := hllSketch(cm.OverflowServices.OverflowSpansEstimator)
event := getOverflowBaseEvent(cm.YoungestEventTimestamp)
overflowSpanMetricsToAPMEvent(
processingTime,
cm.OverflowServices.OverflowSpans,
estimator.Estimate(),
event,
aggIntervalStr,
)
b = append(b, event)
}
}
return &b, nil
}