func()

in aggregators/merger.go [49:134]


func (m *combinedMetricsMerger) merge(from *aggregationpb.CombinedMetrics) {
	// We merge the below fields irrespective of the services present
	// because it is possible for services to be empty if the event
	// does not fit the criteria for aggregations.
	m.metrics.EventsTotal += from.EventsTotal
	if m.metrics.YoungestEventTimestamp < from.YoungestEventTimestamp {
		m.metrics.YoungestEventTimestamp = from.YoungestEventTimestamp
	}
	// If there is overflow due to max services in either of the buckets being
	// merged then we can merge the overflow buckets without considering any
	// other scenarios.
	if len(from.OverflowServicesEstimator) > 0 {
		mergeOverflow(&m.metrics.OverflowServices, from.OverflowServices)
		mergeEstimator(
			&m.metrics.OverflowServicesEstimator,
			hllSketch(from.OverflowServicesEstimator),
		)
	}

	if len(from.ServiceMetrics) == 0 {
		return
	}
	if m.metrics.Services == nil {
		m.metrics.Services = make(map[serviceAggregationKey]serviceMetrics)
	}

	// Iterate over the services in the _from_ combined metrics and merge them
	// into the _to_ combined metrics as per the following rules:
	// 1. If the service in the _from_ bucket is also present in the _to_
	//    bucket then merge them.
	// 2. If the service in the _from_ bucket is not in the _to_ bucket:
	//    2.a. If the _to_ bucket hasn't breached the max services limit then
	//         create a new service in _to_ bucket and merge.
	//    2.b. Else, merge the _from_ bucket to the overflow service bucket
	//         of the _to_ combined metrics.
	for i := range from.ServiceMetrics {
		fromSvc := from.ServiceMetrics[i]
		serviceKeyHash := protohash.HashServiceAggregationKey(xxhash.Digest{}, fromSvc.Key)
		var sk serviceAggregationKey
		sk.FromProto(fromSvc.Key)
		toSvc, svcOverflow := getServiceMetrics(&m.metrics, sk, m.limits.MaxServices)
		if svcOverflow {
			mergeOverflow(&m.metrics.OverflowServices, fromSvc.Metrics.OverflowGroups)
			mergeToOverflowFromServiceMetrics(&m.metrics.OverflowServices, fromSvc.Metrics, serviceKeyHash)
			insertHash(&m.metrics.OverflowServicesEstimator, serviceKeyHash.Sum64())
			continue
		}
		if fromSvc.Metrics != nil {
			mergeOverflow(&toSvc.OverflowGroups, fromSvc.Metrics.OverflowGroups)
			mergeTransactionGroups(
				toSvc.TransactionGroups,
				fromSvc.Metrics.TransactionMetrics,
				constraint.New(
					len(toSvc.TransactionGroups),
					m.limits.MaxTransactionGroupsPerService,
				),
				m.constraints.totalTransactionGroups,
				serviceKeyHash,
				&toSvc.OverflowGroups.OverflowTransaction,
			)
			mergeServiceTransactionGroups(
				toSvc.ServiceTransactionGroups,
				fromSvc.Metrics.ServiceTransactionMetrics,
				constraint.New(
					len(toSvc.ServiceTransactionGroups),
					m.limits.MaxServiceTransactionGroupsPerService,
				),
				m.constraints.totalServiceTransactionGroups,
				serviceKeyHash,
				&toSvc.OverflowGroups.OverflowServiceTransaction,
			)
			mergeSpanGroups(
				toSvc.SpanGroups,
				fromSvc.Metrics.SpanMetrics,
				constraint.New(
					len(toSvc.SpanGroups),
					m.limits.MaxSpanGroupsPerService,
				),
				m.constraints.totalSpanGroups,
				serviceKeyHash,
				&toSvc.OverflowGroups.OverflowSpan,
			)
		}
		m.metrics.Services[sk] = toSvc
	}
}