aggregators/merger.go (386 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package aggregators import ( "io" "slices" "sort" "github.com/cespare/xxhash/v2" "github.com/elastic/apm-aggregation/aggregationpb" "github.com/elastic/apm-aggregation/aggregators/internal/constraint" "github.com/elastic/apm-aggregation/aggregators/internal/protohash" ) type combinedMetricsMerger struct { limits Limits constraints constraints metrics combinedMetrics } func (m *combinedMetricsMerger) MergeNewer(value []byte) error { from := &aggregationpb.CombinedMetrics{} if err := from.UnmarshalVT(value); err != nil { return err } m.merge(from) return nil } func (m *combinedMetricsMerger) MergeOlder(value []byte) error { from := &aggregationpb.CombinedMetrics{} if err := from.UnmarshalVT(value); err != nil { return err } m.merge(from) return nil } func (m *combinedMetricsMerger) Finish(includesBase bool) ([]byte, io.Closer, error) { pb := m.metrics.ToProto() data, err := pb.MarshalVT() return data, nil, err } func (m *combinedMetricsMerger) merge(from *aggregationpb.CombinedMetrics) { // We merge the below fields irrespective of the services present // because it is possible for services to be empty if the event // does not fit the criteria for aggregations. m.metrics.EventsTotal += from.EventsTotal if m.metrics.YoungestEventTimestamp < from.YoungestEventTimestamp { m.metrics.YoungestEventTimestamp = from.YoungestEventTimestamp } // If there is overflow due to max services in either of the buckets being // merged then we can merge the overflow buckets without considering any // other scenarios. if len(from.OverflowServicesEstimator) > 0 { mergeOverflow(&m.metrics.OverflowServices, from.OverflowServices) mergeEstimator( &m.metrics.OverflowServicesEstimator, hllSketch(from.OverflowServicesEstimator), ) } if len(from.ServiceMetrics) == 0 { return } if m.metrics.Services == nil { m.metrics.Services = make(map[serviceAggregationKey]serviceMetrics) } // Iterate over the services in the _from_ combined metrics and merge them // into the _to_ combined metrics as per the following rules: // 1. If the service in the _from_ bucket is also present in the _to_ // bucket then merge them. // 2. If the service in the _from_ bucket is not in the _to_ bucket: // 2.a. If the _to_ bucket hasn't breached the max services limit then // create a new service in _to_ bucket and merge. // 2.b. Else, merge the _from_ bucket to the overflow service bucket // of the _to_ combined metrics. for i := range from.ServiceMetrics { fromSvc := from.ServiceMetrics[i] serviceKeyHash := protohash.HashServiceAggregationKey(xxhash.Digest{}, fromSvc.Key) var sk serviceAggregationKey sk.FromProto(fromSvc.Key) toSvc, svcOverflow := getServiceMetrics(&m.metrics, sk, m.limits.MaxServices) if svcOverflow { mergeOverflow(&m.metrics.OverflowServices, fromSvc.Metrics.OverflowGroups) mergeToOverflowFromServiceMetrics(&m.metrics.OverflowServices, fromSvc.Metrics, serviceKeyHash) insertHash(&m.metrics.OverflowServicesEstimator, serviceKeyHash.Sum64()) continue } if fromSvc.Metrics != nil { mergeOverflow(&toSvc.OverflowGroups, fromSvc.Metrics.OverflowGroups) mergeTransactionGroups( toSvc.TransactionGroups, fromSvc.Metrics.TransactionMetrics, constraint.New( len(toSvc.TransactionGroups), m.limits.MaxTransactionGroupsPerService, ), m.constraints.totalTransactionGroups, serviceKeyHash, &toSvc.OverflowGroups.OverflowTransaction, ) mergeServiceTransactionGroups( toSvc.ServiceTransactionGroups, fromSvc.Metrics.ServiceTransactionMetrics, constraint.New( len(toSvc.ServiceTransactionGroups), m.limits.MaxServiceTransactionGroupsPerService, ), m.constraints.totalServiceTransactionGroups, serviceKeyHash, &toSvc.OverflowGroups.OverflowServiceTransaction, ) mergeSpanGroups( toSvc.SpanGroups, fromSvc.Metrics.SpanMetrics, constraint.New( len(toSvc.SpanGroups), m.limits.MaxSpanGroupsPerService, ), m.constraints.totalSpanGroups, serviceKeyHash, &toSvc.OverflowGroups.OverflowSpan, ) } m.metrics.Services[sk] = toSvc } } // mergeTransactionGroups merges transaction aggregation groups for two combined metrics // considering max transaction groups and max transaction groups per service limits. func mergeTransactionGroups( to map[transactionAggregationKey]*aggregationpb.KeyedTransactionMetrics, from []*aggregationpb.KeyedTransactionMetrics, perSvcConstraint, globalConstraint *constraint.Constraint, hash xxhash.Digest, overflowTo *overflowTransaction, ) { for i := range from { fromTxn := from[i] var tk transactionAggregationKey tk.FromProto(fromTxn.Key) toTxn, ok := to[tk] if !ok { overflowed := perSvcConstraint.Maxed() || globalConstraint.Maxed() if overflowed { fromTxnKeyHash := protohash.HashTransactionAggregationKey(hash, fromTxn.Key) overflowTo.Merge(fromTxn.Metrics, fromTxnKeyHash.Sum64()) continue } perSvcConstraint.Add(1) globalConstraint.Add(1) to[tk] = fromTxn.CloneVT() continue } mergeKeyedTransactionMetrics(toTxn, fromTxn) } } // mergeServiceTransactionGroups merges service transaction aggregation groups for two // combined metrics considering max service transaction groups and max service // transaction groups per service limits. func mergeServiceTransactionGroups( to map[serviceTransactionAggregationKey]*aggregationpb.KeyedServiceTransactionMetrics, from []*aggregationpb.KeyedServiceTransactionMetrics, perSvcConstraint, globalConstraint *constraint.Constraint, hash xxhash.Digest, overflowTo *overflowServiceTransaction, ) { for i := range from { fromSvcTxn := from[i] var stk serviceTransactionAggregationKey stk.FromProto(fromSvcTxn.Key) toSvcTxn, ok := to[stk] if !ok { overflowed := perSvcConstraint.Maxed() || globalConstraint.Maxed() if overflowed { fromSvcTxnKeyHash := protohash.HashServiceTransactionAggregationKey(hash, fromSvcTxn.Key) overflowTo.Merge(fromSvcTxn.Metrics, fromSvcTxnKeyHash.Sum64()) continue } perSvcConstraint.Add(1) globalConstraint.Add(1) to[stk] = fromSvcTxn.CloneVT() continue } mergeKeyedServiceTransactionMetrics(toSvcTxn, fromSvcTxn) } } // mergeSpanGroups merges span aggregation groups for two combined metrics considering // max span groups and max span groups per service limits. func mergeSpanGroups( to map[spanAggregationKey]*aggregationpb.KeyedSpanMetrics, from []*aggregationpb.KeyedSpanMetrics, perSvcConstraint, globalConstraint *constraint.Constraint, hash xxhash.Digest, overflowTo *overflowSpan, ) { for i := range from { fromSpan := from[i] var spk spanAggregationKey spk.FromProto(fromSpan.Key) toSpan, ok := to[spk] if !ok { // Protect against agents that send high cardinality span names by dropping // span.name if more than half of the per svc span group limit is reached. originalSpanName := fromSpan.Key.SpanName half := perSvcConstraint.Limit() / 2 if perSvcConstraint.Value() >= half { spk.SpanName = "" fromSpan.Key.SpanName = "" toSpan, ok = to[spk] } if !ok { overflowed := perSvcConstraint.Maxed() || globalConstraint.Maxed() if overflowed { // Restore span name in case it was dropped above, // for cardinality estimation. fromSpan.Key.SpanName = originalSpanName fromSpanKeyHash := protohash.HashSpanAggregationKey(hash, fromSpan.Key) overflowTo.Merge(fromSpan.Metrics, fromSpanKeyHash.Sum64()) continue } perSvcConstraint.Add(1) globalConstraint.Add(1) to[spk] = fromSpan.CloneVT() continue } } mergeKeyedSpanMetrics(toSpan, fromSpan) } } func mergeToOverflowFromServiceMetrics( to *overflow, from *aggregationpb.ServiceMetrics, hash xxhash.Digest, ) { if from == nil { return } for _, ktm := range from.TransactionMetrics { ktmKeyHash := protohash.HashTransactionAggregationKey(hash, ktm.Key) to.OverflowTransaction.Merge(ktm.Metrics, ktmKeyHash.Sum64()) } for _, kstm := range from.ServiceTransactionMetrics { kstmKeyHash := protohash.HashServiceTransactionAggregationKey(hash, kstm.Key) to.OverflowServiceTransaction.Merge(kstm.Metrics, kstmKeyHash.Sum64()) } for _, ksm := range from.SpanMetrics { ksmKeyHash := protohash.HashSpanAggregationKey(hash, ksm.Key) to.OverflowSpan.Merge(ksm.Metrics, ksmKeyHash.Sum64()) } } func mergeOverflow( to *overflow, fromproto *aggregationpb.Overflow, ) { if fromproto == nil { return } var from overflow from.FromProto(fromproto) to.OverflowTransaction.MergeOverflow(&from.OverflowTransaction) to.OverflowServiceTransaction.MergeOverflow(&from.OverflowServiceTransaction) to.OverflowSpan.MergeOverflow(&from.OverflowSpan) } func mergeKeyedTransactionMetrics( to, from *aggregationpb.KeyedTransactionMetrics, ) { if from.Metrics == nil { return } if to.Metrics == nil { to.Metrics = &aggregationpb.TransactionMetrics{} } mergeTransactionMetrics(to.Metrics, from.Metrics) } func mergeTransactionMetrics( to, from *aggregationpb.TransactionMetrics, ) { if to.Histogram == nil && from.Histogram != nil { to.Histogram = &aggregationpb.HDRHistogram{} } if to.Histogram != nil && from.Histogram != nil { mergeHistogram(to.Histogram, from.Histogram) } } func mergeKeyedServiceTransactionMetrics( to, from *aggregationpb.KeyedServiceTransactionMetrics, ) { if from.Metrics == nil { return } if to.Metrics == nil { to.Metrics = &aggregationpb.ServiceTransactionMetrics{} } mergeServiceTransactionMetrics(to.Metrics, from.Metrics) } func mergeServiceTransactionMetrics( to, from *aggregationpb.ServiceTransactionMetrics, ) { if to.Histogram == nil && from.Histogram != nil { to.Histogram = &aggregationpb.HDRHistogram{} } if to.Histogram != nil && from.Histogram != nil { mergeHistogram(to.Histogram, from.Histogram) } to.FailureCount += from.FailureCount to.SuccessCount += from.SuccessCount } func mergeKeyedSpanMetrics(to, from *aggregationpb.KeyedSpanMetrics) { if from.Metrics == nil { return } if to.Metrics == nil { to.Metrics = &aggregationpb.SpanMetrics{} } mergeSpanMetrics(to.Metrics, from.Metrics) } func mergeSpanMetrics(to, from *aggregationpb.SpanMetrics) { to.Count += from.Count to.Sum += from.Sum } // mergeHistogram merges two proto representation of HDRHistogram. The // merge assumes both histograms are created with same arguments and // their representations are sorted by bucket. func mergeHistogram(to, from *aggregationpb.HDRHistogram) { if len(from.Buckets) == 0 { return } if len(to.Buckets) == 0 { to.Buckets = append(to.Buckets, from.Buckets...) to.Counts = append(to.Counts, from.Counts...) return } startToIdx, found := sort.Find(len(to.Buckets), func(i int) int { return int(from.Buckets[0] - to.Buckets[i]) }) if found && len(from.Buckets) == 1 { // optimize for single value of `from` also found in `to` to.Counts[startToIdx] += from.Counts[0] return } // Since all values of `from` must be greater than the first value, we can // limit the search space in `to` to [startToIdx, len(to.Buckets)) requiredLen := len(to.Buckets) + len(from.Buckets) for toIdx, fromIdx := startToIdx, 0; toIdx < len(to.Buckets) && fromIdx < len(from.Buckets); { v := to.Buckets[toIdx] - from.Buckets[fromIdx] switch { case v == 0: // For every bucket that is common, we need one less bucket in final slice requiredLen-- toIdx++ fromIdx++ case v < 0: toIdx++ case v > 0: fromIdx++ } } toIdx, fromIdx := len(to.Buckets)-1, len(from.Buckets)-1 to.Buckets = slices.Grow(to.Buckets, requiredLen-len(to.Buckets))[:requiredLen] to.Counts = slices.Grow(to.Counts, requiredLen-len(to.Counts))[:requiredLen] for idx := len(to.Buckets) - 1; idx >= 0; idx-- { if fromIdx < 0 { break } if toIdx < startToIdx { copy(to.Counts[startToIdx:idx+1], from.Counts[0:fromIdx+1]) copy(to.Buckets[startToIdx:idx+1], from.Buckets[0:fromIdx+1]) break } v := to.Buckets[toIdx] - from.Buckets[fromIdx] switch { case v == 0: to.Counts[idx] = to.Counts[toIdx] + from.Counts[fromIdx] to.Buckets[idx] = to.Buckets[toIdx] toIdx-- fromIdx-- case v > 0: to.Counts[idx] = to.Counts[toIdx] to.Buckets[idx] = to.Buckets[toIdx] toIdx-- case v < 0: to.Counts[idx] = from.Counts[fromIdx] to.Buckets[idx] = from.Buckets[fromIdx] fromIdx-- } } } // getServiceMetrics returns the service metric from a combined metrics based on the // service key argument, creating one if needed. A second bool return value indicates // if a service is returned or no service can be created due to max svcs limit breach. func getServiceMetrics(cm *combinedMetrics, svcKey serviceAggregationKey, maxSvcs int) (serviceMetrics, bool) { srcSvc, ok := cm.Services[svcKey] if !ok { if len(cm.Services) < maxSvcs { return newServiceMetrics(), false } return serviceMetrics{}, true } return srcSvc, false } func newServiceMetrics() serviceMetrics { return serviceMetrics{ TransactionGroups: make(map[transactionAggregationKey]*aggregationpb.KeyedTransactionMetrics), ServiceTransactionGroups: make(map[serviceTransactionAggregationKey]*aggregationpb.KeyedServiceTransactionMetrics), SpanGroups: make(map[spanAggregationKey]*aggregationpb.KeyedSpanMetrics), } } // constraints is a group of constraints to be observed during merge operations. type constraints struct { totalTransactionGroups *constraint.Constraint totalServiceTransactionGroups *constraint.Constraint totalSpanGroups *constraint.Constraint } func newConstraints(limits Limits) constraints { return constraints{ totalTransactionGroups: constraint.New(0, limits.MaxTransactionGroups), totalServiceTransactionGroups: constraint.New(0, limits.MaxServiceTransactionGroups), totalSpanGroups: constraint.New(0, limits.MaxSpanGroups), } } // MergeCombinedMetrics merges provided CombinedMetrics together. func MergeCombinedMetrics(limits Limits, metrics ...*aggregationpb.CombinedMetrics) *aggregationpb.CombinedMetrics { merger := combinedMetricsMerger{ limits: limits, constraints: newConstraints(limits), } for _, m := range metrics { merger.merge(m) } return merger.metrics.ToProto() }