in aggregators/merger.go [49:134]
func (m *combinedMetricsMerger) merge(from *aggregationpb.CombinedMetrics) {
// We merge the below fields irrespective of the services present
// because it is possible for services to be empty if the event
// does not fit the criteria for aggregations.
m.metrics.EventsTotal += from.EventsTotal
if m.metrics.YoungestEventTimestamp < from.YoungestEventTimestamp {
m.metrics.YoungestEventTimestamp = from.YoungestEventTimestamp
}
// If there is overflow due to max services in either of the buckets being
// merged then we can merge the overflow buckets without considering any
// other scenarios.
if len(from.OverflowServicesEstimator) > 0 {
mergeOverflow(&m.metrics.OverflowServices, from.OverflowServices)
mergeEstimator(
&m.metrics.OverflowServicesEstimator,
hllSketch(from.OverflowServicesEstimator),
)
}
if len(from.ServiceMetrics) == 0 {
return
}
if m.metrics.Services == nil {
m.metrics.Services = make(map[serviceAggregationKey]serviceMetrics)
}
// Iterate over the services in the _from_ combined metrics and merge them
// into the _to_ combined metrics as per the following rules:
// 1. If the service in the _from_ bucket is also present in the _to_
// bucket then merge them.
// 2. If the service in the _from_ bucket is not in the _to_ bucket:
// 2.a. If the _to_ bucket hasn't breached the max services limit then
// create a new service in _to_ bucket and merge.
// 2.b. Else, merge the _from_ bucket to the overflow service bucket
// of the _to_ combined metrics.
for i := range from.ServiceMetrics {
fromSvc := from.ServiceMetrics[i]
serviceKeyHash := protohash.HashServiceAggregationKey(xxhash.Digest{}, fromSvc.Key)
var sk serviceAggregationKey
sk.FromProto(fromSvc.Key)
toSvc, svcOverflow := getServiceMetrics(&m.metrics, sk, m.limits.MaxServices)
if svcOverflow {
mergeOverflow(&m.metrics.OverflowServices, fromSvc.Metrics.OverflowGroups)
mergeToOverflowFromServiceMetrics(&m.metrics.OverflowServices, fromSvc.Metrics, serviceKeyHash)
insertHash(&m.metrics.OverflowServicesEstimator, serviceKeyHash.Sum64())
continue
}
if fromSvc.Metrics != nil {
mergeOverflow(&toSvc.OverflowGroups, fromSvc.Metrics.OverflowGroups)
mergeTransactionGroups(
toSvc.TransactionGroups,
fromSvc.Metrics.TransactionMetrics,
constraint.New(
len(toSvc.TransactionGroups),
m.limits.MaxTransactionGroupsPerService,
),
m.constraints.totalTransactionGroups,
serviceKeyHash,
&toSvc.OverflowGroups.OverflowTransaction,
)
mergeServiceTransactionGroups(
toSvc.ServiceTransactionGroups,
fromSvc.Metrics.ServiceTransactionMetrics,
constraint.New(
len(toSvc.ServiceTransactionGroups),
m.limits.MaxServiceTransactionGroupsPerService,
),
m.constraints.totalServiceTransactionGroups,
serviceKeyHash,
&toSvc.OverflowGroups.OverflowServiceTransaction,
)
mergeSpanGroups(
toSvc.SpanGroups,
fromSvc.Metrics.SpanMetrics,
constraint.New(
len(toSvc.SpanGroups),
m.limits.MaxSpanGroupsPerService,
),
m.constraints.totalSpanGroups,
serviceKeyHash,
&toSvc.OverflowGroups.OverflowSpan,
)
}
m.metrics.Services[sk] = toSvc
}
}