aggregators/converter.go (924 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package aggregators import ( "errors" "fmt" "math" "slices" "sort" "sync" "time" "github.com/cespare/xxhash/v2" "github.com/elastic/apm-aggregation/aggregationpb" "github.com/elastic/apm-aggregation/aggregators/internal/hdrhistogram" "github.com/elastic/apm-aggregation/aggregators/internal/protohash" "github.com/elastic/apm-aggregation/aggregators/nullable" "github.com/elastic/apm-data/model/modelpb" ) const ( spanMetricsetName = "service_destination" txnMetricsetName = "transaction" svcTxnMetricsetName = "service_transaction" summaryMetricsetName = "service_summary" overflowBucketName = "_other" ) var ( partitionedMetricsBuilderPool sync.Pool eventMetricsBuilderPool sync.Pool ) // partitionedMetricsBuilder provides support for building partitioned // sets of metrics from an event. type partitionedMetricsBuilder struct { partitions uint16 serviceHash xxhash.Digest builders []*eventMetricsBuilder // partitioned metrics // Event metrics are for exactly one service, so we create an array of a // single element and use that for backing the slice in CombinedMetrics. serviceAggregationKey aggregationpb.ServiceAggregationKey serviceMetrics aggregationpb.ServiceMetrics keyedServiceMetrics aggregationpb.KeyedServiceMetrics keyedServiceMetricsArray [1]*aggregationpb.KeyedServiceMetrics // We reuse a single CombinedMetrics for all partitions, by iterating // through each partition's metrics and setting them on the CombinedMetrics // before invoking the callback in eventToCombinedMetrics. combinedMetrics aggregationpb.CombinedMetrics } func getPartitionedMetricsBuilder( serviceAggregationKey aggregationpb.ServiceAggregationKey, partitions uint16, ) *partitionedMetricsBuilder { p, ok := partitionedMetricsBuilderPool.Get().(*partitionedMetricsBuilder) if !ok { p = &partitionedMetricsBuilder{} p.keyedServiceMetrics.Key = &p.serviceAggregationKey p.keyedServiceMetrics.Metrics = &p.serviceMetrics p.keyedServiceMetricsArray[0] = &p.keyedServiceMetrics p.combinedMetrics.ServiceMetrics = p.keyedServiceMetricsArray[:] } p.serviceAggregationKey = serviceAggregationKey p.serviceHash = protohash.HashServiceAggregationKey(xxhash.Digest{}, &p.serviceAggregationKey) p.partitions = partitions return p } // release releases all partitioned builders back to their pools. // Objects will be reset as needed if/when the builder is reacquired. func (p *partitionedMetricsBuilder) release() { for i, mb := range p.builders { mb.release() p.builders[i] = nil } p.builders = p.builders[:0] partitionedMetricsBuilderPool.Put(p) } func (p *partitionedMetricsBuilder) processEvent(e *modelpb.APMEvent) { switch e.Type() { case modelpb.TransactionEventType: repCount := e.GetTransaction().GetRepresentativeCount() if repCount <= 0 { p.addServiceSummaryMetrics() return } duration := time.Duration(e.GetEvent().GetDuration()) p.addTransactionMetrics(e, repCount, duration) p.addServiceTransactionMetrics(e, repCount, duration) for _, dss := range e.GetTransaction().GetDroppedSpansStats() { p.addDroppedSpanStatsMetrics(dss, repCount) } case modelpb.SpanEventType: target := e.GetService().GetTarget() repCount := e.GetSpan().GetRepresentativeCount() destSvc := e.GetSpan().GetDestinationService().GetResource() if repCount <= 0 || (target == nil && destSvc == "") { p.addServiceSummaryMetrics() return } p.addSpanMetrics(e, repCount) default: // All other event types should add an empty service metrics, // for adding to service summary metrics. p.addServiceSummaryMetrics() } } func (p *partitionedMetricsBuilder) addTransactionMetrics(e *modelpb.APMEvent, count float64, duration time.Duration) { var key aggregationpb.TransactionAggregationKey setTransactionKey(e, &key) hash := protohash.HashTransactionAggregationKey(p.serviceHash, &key) mb := p.get(hash) mb.transactionAggregationKey = key hdr := hdrhistogram.New() hdr.RecordDuration(duration, count) setHistogramProto(hdr, &mb.transactionHistogram) mb.transactionMetrics.Histogram = &mb.transactionHistogram mb.keyedTransactionMetricsSlice = mb.keyedTransactionMetricsArray[:] } func (p *partitionedMetricsBuilder) addServiceTransactionMetrics(e *modelpb.APMEvent, count float64, duration time.Duration) { var key aggregationpb.ServiceTransactionAggregationKey setServiceTransactionKey(e, &key) hash := protohash.HashServiceTransactionAggregationKey(p.serviceHash, &key) mb := p.get(hash) mb.serviceTransactionAggregationKey = key if mb.transactionMetrics.Histogram == nil { // mb.TransactionMetrics.Histogram will be set if the event's // transaction metric ended up in the same partition. hdr := hdrhistogram.New() hdr.RecordDuration(duration, count) setHistogramProto(hdr, &mb.transactionHistogram) } mb.serviceTransactionMetrics.Histogram = &mb.transactionHistogram switch e.GetEvent().GetOutcome() { case "failure": mb.serviceTransactionMetrics.SuccessCount = 0 mb.serviceTransactionMetrics.FailureCount = count case "success": mb.serviceTransactionMetrics.SuccessCount = count mb.serviceTransactionMetrics.FailureCount = 0 default: mb.serviceTransactionMetrics.SuccessCount = 0 mb.serviceTransactionMetrics.FailureCount = 0 } mb.keyedServiceTransactionMetricsSlice = mb.keyedServiceTransactionMetricsArray[:] } func (p *partitionedMetricsBuilder) addDroppedSpanStatsMetrics(dss *modelpb.DroppedSpanStats, repCount float64) { var key aggregationpb.SpanAggregationKey setDroppedSpanStatsKey(dss, &key) hash := protohash.HashSpanAggregationKey(p.serviceHash, &key) mb := p.get(hash) i := len(mb.keyedSpanMetricsSlice) if i == len(mb.keyedSpanMetricsArray) { // No more capacity. The spec says that when 128 dropped span // stats entries are reached, then any remaining entries will // be silently discarded. return } mb.spanAggregationKey[i] = key setDroppedSpanStatsMetrics(dss, repCount, &mb.spanMetrics[i]) mb.keyedSpanMetrics[i].Key = &mb.spanAggregationKey[i] mb.keyedSpanMetrics[i].Metrics = &mb.spanMetrics[i] mb.keyedSpanMetricsSlice = append(mb.keyedSpanMetricsSlice, &mb.keyedSpanMetrics[i]) } func (p *partitionedMetricsBuilder) addSpanMetrics(e *modelpb.APMEvent, repCount float64) { var key aggregationpb.SpanAggregationKey setSpanKey(e, &key) hash := protohash.HashSpanAggregationKey(p.serviceHash, &key) mb := p.get(hash) i := len(mb.keyedSpanMetricsSlice) mb.spanAggregationKey[i] = key setSpanMetrics(e, repCount, &mb.spanMetrics[i]) mb.keyedSpanMetrics[i].Key = &mb.spanAggregationKey[i] mb.keyedSpanMetrics[i].Metrics = &mb.spanMetrics[i] mb.keyedSpanMetricsSlice = append(mb.keyedSpanMetricsSlice, &mb.keyedSpanMetrics[i]) } func (p *partitionedMetricsBuilder) addServiceSummaryMetrics() { // There are no actual metric values, we're just want to // create documents for the dimensions, so we can build a // list of services. _ = p.get(p.serviceHash) } func (p *partitionedMetricsBuilder) get(h xxhash.Digest) *eventMetricsBuilder { partition := uint16(h.Sum64() % uint64(p.partitions)) for _, mb := range p.builders { if mb.partition == partition { return mb } } mb := getEventMetricsBuilder(partition) p.builders = append(p.builders, mb) return mb } // eventMetricsBuilder holds memory for the contents of per-partition // ServiceMetrics. Each instance of the struct is capable of holding // as many metrics as may be produced for a single event. // // For each metric type, the builder holds: // - an array with enough capacity to hold the maximum possible // number of that type // - a slice of the array, for tracking the actual number of // metrics of that type that will be produced; this is what // will be used for setting CombinedMetrics fields // - for each array element, space for an aggregation key // - for each array element, space for the metric values type eventMetricsBuilder struct { partition uint16 // Preallocate space for a single-valued histogram. This histogram may // be used for either or both transaction and service transaction metrics. transactionHDRHistogramRepresentation *hdrhistogram.HistogramRepresentation transactionHistogramCounts [1]int64 transactionHistogramBuckets [1]int32 transactionHistogram aggregationpb.HDRHistogram // There can be at most 1 transaction metric per event. transactionAggregationKey aggregationpb.TransactionAggregationKey transactionMetrics aggregationpb.TransactionMetrics keyedTransactionMetrics aggregationpb.KeyedTransactionMetrics keyedTransactionMetricsArray [1]*aggregationpb.KeyedTransactionMetrics keyedTransactionMetricsSlice []*aggregationpb.KeyedTransactionMetrics // There can be at most 1 service transaction metric per event. serviceTransactionAggregationKey aggregationpb.ServiceTransactionAggregationKey serviceTransactionMetrics aggregationpb.ServiceTransactionMetrics keyedServiceTransactionMetrics aggregationpb.KeyedServiceTransactionMetrics keyedServiceTransactionMetricsArray [1]*aggregationpb.KeyedServiceTransactionMetrics keyedServiceTransactionMetricsSlice []*aggregationpb.KeyedServiceTransactionMetrics // There can be at most 128 span metrics per event: // - exactly 1 for a span event // - at most 128 (dropped span stats) for a transaction event (1) // // (1) https://github.com/elastic/apm/blob/main/specs/agents/handling-huge-traces/tracing-spans-dropped-stats.md#limits spanAggregationKey [128]aggregationpb.SpanAggregationKey spanMetrics [128]aggregationpb.SpanMetrics keyedSpanMetrics [128]aggregationpb.KeyedSpanMetrics keyedSpanMetricsArray [128]*aggregationpb.KeyedSpanMetrics keyedSpanMetricsSlice []*aggregationpb.KeyedSpanMetrics } func getEventMetricsBuilder(partition uint16) *eventMetricsBuilder { mb, ok := eventMetricsBuilderPool.Get().(*eventMetricsBuilder) if ok { mb.partition = partition // Explicitly reset instead of invoking `Reset` to avoid extra cost due to // additional protobuf specfic resetting logic implemented by `Reset`. mb.serviceTransactionMetrics = aggregationpb.ServiceTransactionMetrics{} mb.transactionMetrics = aggregationpb.TransactionMetrics{} for i := range mb.spanMetrics { mb.spanMetrics[i] = aggregationpb.SpanMetrics{} } mb.transactionHDRHistogramRepresentation.CountsRep.Reset() mb.keyedServiceTransactionMetricsSlice = mb.keyedServiceTransactionMetricsSlice[:0] mb.keyedTransactionMetricsSlice = mb.keyedTransactionMetricsSlice[:0] mb.keyedSpanMetricsSlice = mb.keyedSpanMetricsSlice[:0] return mb } mb = &eventMetricsBuilder{partition: partition} mb.transactionHDRHistogramRepresentation = hdrhistogram.New() mb.transactionHistogram.Counts = mb.transactionHistogramCounts[:0] mb.transactionHistogram.Buckets = mb.transactionHistogramBuckets[:0] mb.transactionMetrics.Histogram = nil mb.keyedTransactionMetrics.Key = &mb.transactionAggregationKey mb.keyedTransactionMetrics.Metrics = &mb.transactionMetrics mb.keyedTransactionMetricsArray[0] = &mb.keyedTransactionMetrics mb.keyedTransactionMetricsSlice = mb.keyedTransactionMetricsArray[:0] mb.keyedServiceTransactionMetrics.Key = &mb.serviceTransactionAggregationKey mb.keyedServiceTransactionMetrics.Metrics = &mb.serviceTransactionMetrics mb.keyedServiceTransactionMetricsArray[0] = &mb.keyedServiceTransactionMetrics mb.keyedServiceTransactionMetricsSlice = mb.keyedServiceTransactionMetricsArray[:0] mb.keyedSpanMetricsSlice = mb.keyedSpanMetricsArray[:0] return mb } // release releases the builder back to the pool. // Objects will be reset as needed if/when the builder is reacquired. func (mb *eventMetricsBuilder) release() { eventMetricsBuilderPool.Put(mb) } // EventToCombinedMetrics converts APMEvent to one or more CombinedMetrics and // calls the provided callback for each pair of CombinedMetricsKey and // CombinedMetrics. The callback MUST NOT hold the reference of the passed // CombinedMetrics. If required, the callback can call CloneVT to clone the // CombinedMetrics. If an event results in multiple metrics, they may be spread // across different partitions. // // EventToCombinedMetrics will never produce overflow metrics, as it applies to a // single APMEvent. func EventToCombinedMetrics( e *modelpb.APMEvent, unpartitionedKey CombinedMetricsKey, partitions uint16, callback func(CombinedMetricsKey, *aggregationpb.CombinedMetrics) error, ) error { globalLabels, err := marshalEventGlobalLabels(e) if err != nil { return fmt.Errorf("failed to marshal global labels: %w", err) } pmb := getPartitionedMetricsBuilder( aggregationpb.ServiceAggregationKey{ Timestamp: modelpb.FromTime( modelpb.ToTime(e.GetTimestamp()).Truncate(unpartitionedKey.Interval), ), ServiceName: e.GetService().GetName(), ServiceEnvironment: e.GetService().GetEnvironment(), ServiceLanguageName: e.GetService().GetLanguage().GetName(), AgentName: e.GetAgent().GetName(), GlobalLabelsStr: globalLabels, }, partitions, ) defer pmb.release() pmb.processEvent(e) if len(pmb.builders) == 0 { // This is unexpected state as any APMEvent must result in atleast the // service summary metric. If such a state happens then it would indicate // a bug in `processEvent`. return fmt.Errorf("service summary metric must be produced for any event") } // Approximate events total by uniformly distributing the events total // amongst the partitioned key values. pmb.combinedMetrics.EventsTotal = 1 / float64(len(pmb.builders)) pmb.combinedMetrics.YoungestEventTimestamp = e.GetEvent().GetReceived() var errs []error for _, mb := range pmb.builders { key := unpartitionedKey key.PartitionID = mb.partition pmb.serviceMetrics.TransactionMetrics = mb.keyedTransactionMetricsSlice pmb.serviceMetrics.ServiceTransactionMetrics = mb.keyedServiceTransactionMetricsSlice pmb.serviceMetrics.SpanMetrics = mb.keyedSpanMetricsSlice if err := callback(key, &pmb.combinedMetrics); err != nil { errs = append(errs, err) } } if len(errs) > 0 { return fmt.Errorf("failed while executing callback: %w", errors.Join(errs...)) } return nil } // CombinedMetricsToBatch converts CombinedMetrics to a batch of APMEvents. func CombinedMetricsToBatch( cm *aggregationpb.CombinedMetrics, processingTime time.Time, aggInterval time.Duration, ) (*modelpb.Batch, error) { if cm == nil || len(cm.ServiceMetrics) == 0 { return nil, nil } var batchSize int // service_summary overflow metric if len(cm.OverflowServicesEstimator) > 0 { batchSize++ if len(cm.OverflowServices.OverflowTransactionsEstimator) > 0 { batchSize++ } if len(cm.OverflowServices.OverflowServiceTransactionsEstimator) > 0 { batchSize++ } if len(cm.OverflowServices.OverflowSpansEstimator) > 0 { batchSize++ } } for _, ksm := range cm.ServiceMetrics { sm := ksm.Metrics batchSize += len(sm.TransactionMetrics) batchSize += len(sm.ServiceTransactionMetrics) batchSize += len(sm.SpanMetrics) batchSize++ // Each service will create a service summary metric if sm.OverflowGroups == nil { continue } if len(sm.OverflowGroups.OverflowTransactionsEstimator) > 0 { batchSize++ } if len(sm.OverflowGroups.OverflowServiceTransactionsEstimator) > 0 { batchSize++ } if len(sm.OverflowGroups.OverflowSpansEstimator) > 0 { batchSize++ } } b := make(modelpb.Batch, 0, batchSize) aggIntervalStr := formatDuration(aggInterval) now := time.Now() for _, ksm := range cm.ServiceMetrics { sk, sm := ksm.Key, ksm.Metrics var gl globalLabels if err := gl.UnmarshalBinary(sk.GlobalLabelsStr); err != nil { return nil, fmt.Errorf("failed to unmarshal global labels: %w", err) } getBaseEventWithLabels := func() *modelpb.APMEvent { event := getBaseEvent(sk, now) event.Labels = gl.Labels event.NumericLabels = gl.NumericLabels return event } // transaction metrics for _, ktm := range sm.TransactionMetrics { event := getBaseEventWithLabels() txnMetricsToAPMEvent(ktm.Key, ktm.Metrics, event, aggIntervalStr) b = append(b, event) } // service transaction metrics for _, kstm := range sm.ServiceTransactionMetrics { event := getBaseEventWithLabels() svcTxnMetricsToAPMEvent(kstm.Key, kstm.Metrics, event, aggIntervalStr) b = append(b, event) } // service destination metrics for _, kspm := range sm.SpanMetrics { event := getBaseEventWithLabels() spanMetricsToAPMEvent(kspm.Key, kspm.Metrics, event, aggIntervalStr) b = append(b, event) } // service summary metrics event := getBaseEventWithLabels() serviceMetricsToAPMEvent(event, aggIntervalStr) b = append(b, event) if sm.OverflowGroups == nil { continue } if len(sm.OverflowGroups.OverflowTransactionsEstimator) > 0 { estimator := hllSketch(sm.OverflowGroups.OverflowTransactionsEstimator) event := getBaseEvent(sk, now) overflowTxnMetricsToAPMEvent( processingTime, sm.OverflowGroups.OverflowTransactions, estimator.Estimate(), event, aggIntervalStr, ) b = append(b, event) } if len(sm.OverflowGroups.OverflowServiceTransactionsEstimator) > 0 { estimator := hllSketch( sm.OverflowGroups.OverflowServiceTransactionsEstimator, ) event := getBaseEvent(sk, now) overflowSvcTxnMetricsToAPMEvent( processingTime, sm.OverflowGroups.OverflowServiceTransactions, estimator.Estimate(), event, aggIntervalStr, ) b = append(b, event) } if len(sm.OverflowGroups.OverflowSpansEstimator) > 0 { estimator := hllSketch(sm.OverflowGroups.OverflowSpansEstimator) event := getBaseEvent(sk, now) overflowSpanMetricsToAPMEvent( processingTime, sm.OverflowGroups.OverflowSpans, estimator.Estimate(), event, aggIntervalStr, ) b = append(b, event) } } if len(cm.OverflowServicesEstimator) > 0 { estimator := hllSketch(cm.OverflowServicesEstimator) event := getOverflowBaseEvent(cm.YoungestEventTimestamp) overflowServiceMetricsToAPMEvent( processingTime, estimator.Estimate(), event, aggIntervalStr, ) b = append(b, event) if len(cm.OverflowServices.OverflowTransactionsEstimator) > 0 { estimator := hllSketch(cm.OverflowServices.OverflowTransactionsEstimator) event := getOverflowBaseEvent(cm.YoungestEventTimestamp) overflowTxnMetricsToAPMEvent( processingTime, cm.OverflowServices.OverflowTransactions, estimator.Estimate(), event, aggIntervalStr, ) b = append(b, event) } if len(cm.OverflowServices.OverflowServiceTransactionsEstimator) > 0 { estimator := hllSketch( cm.OverflowServices.OverflowServiceTransactionsEstimator, ) event := getOverflowBaseEvent(cm.YoungestEventTimestamp) overflowSvcTxnMetricsToAPMEvent( processingTime, cm.OverflowServices.OverflowServiceTransactions, estimator.Estimate(), event, aggIntervalStr, ) b = append(b, event) } if len(cm.OverflowServices.OverflowSpansEstimator) > 0 { estimator := hllSketch(cm.OverflowServices.OverflowSpansEstimator) event := getOverflowBaseEvent(cm.YoungestEventTimestamp) overflowSpanMetricsToAPMEvent( processingTime, cm.OverflowServices.OverflowSpans, estimator.Estimate(), event, aggIntervalStr, ) b = append(b, event) } } return &b, nil } func setSpanMetrics(e *modelpb.APMEvent, repCount float64, out *aggregationpb.SpanMetrics) { var count uint32 = 1 duration := time.Duration(e.GetEvent().GetDuration()) if composite := e.GetSpan().GetComposite(); composite != nil { count = composite.GetCount() duration = time.Duration(composite.GetSum() * float64(time.Millisecond)) } out.Count = float64(count) * repCount out.Sum = float64(duration) * repCount } func setDroppedSpanStatsMetrics(dss *modelpb.DroppedSpanStats, repCount float64, out *aggregationpb.SpanMetrics) { out.Count = float64(dss.GetDuration().GetCount()) * repCount out.Sum = float64(dss.GetDuration().GetSum()) * repCount } func getBaseEvent( key *aggregationpb.ServiceAggregationKey, received time.Time, ) *modelpb.APMEvent { event := &modelpb.APMEvent{} event.Timestamp = key.Timestamp event.Metricset = &modelpb.Metricset{} event.Service = &modelpb.Service{} event.Service.Name = key.ServiceName event.Service.Environment = key.ServiceEnvironment if key.ServiceLanguageName != "" { event.Service.Language = &modelpb.Language{} event.Service.Language.Name = key.ServiceLanguageName } if key.AgentName != "" { event.Agent = &modelpb.Agent{} event.Agent.Name = key.AgentName } event.Event = &modelpb.Event{} event.Event.Received = modelpb.FromTime(received) return event } func getOverflowBaseEvent(youngestEventTS uint64) *modelpb.APMEvent { e := &modelpb.APMEvent{} e.Metricset = &modelpb.Metricset{} e.Service = &modelpb.Service{} e.Service.Name = overflowBucketName e.Event = &modelpb.Event{} e.Event.Received = youngestEventTS return e } func serviceMetricsToAPMEvent( baseEvent *modelpb.APMEvent, intervalStr string, ) { // Most service keys will already be present in the base event if baseEvent.Metricset == nil { baseEvent.Metricset = &modelpb.Metricset{} } baseEvent.Metricset.Name = summaryMetricsetName baseEvent.Metricset.Interval = intervalStr } func txnMetricsToAPMEvent( key *aggregationpb.TransactionAggregationKey, metrics *aggregationpb.TransactionMetrics, baseEvent *modelpb.APMEvent, intervalStr string, ) { histogram := hdrhistogram.New() histogramFromProto(histogram, metrics.Histogram) totalCount, counts, values := histogram.Buckets() eventSuccessCount := &modelpb.SummaryMetric{} switch key.EventOutcome { case "success": eventSuccessCount.Count = totalCount eventSuccessCount.Sum = float64(totalCount) case "failure": eventSuccessCount.Count = totalCount case "unknown": // Keep both Count and Sum as 0. } transactionDurationSummary := &modelpb.SummaryMetric{} transactionDurationSummary.Count = totalCount for i, v := range values { transactionDurationSummary.Sum += v * float64(counts[i]) } if baseEvent.Transaction == nil { baseEvent.Transaction = &modelpb.Transaction{} } baseEvent.Transaction.Name = key.TransactionName baseEvent.Transaction.Type = key.TransactionType baseEvent.Transaction.Result = key.TransactionResult baseEvent.Transaction.Root = key.TraceRoot baseEvent.Transaction.DurationSummary = transactionDurationSummary baseEvent.Transaction.DurationHistogram = &modelpb.Histogram{} baseEvent.Transaction.DurationHistogram.Counts = counts baseEvent.Transaction.DurationHistogram.Values = values if baseEvent.Metricset == nil { baseEvent.Metricset = &modelpb.Metricset{} } baseEvent.Metricset.Name = txnMetricsetName baseEvent.Metricset.DocCount = totalCount baseEvent.Metricset.Interval = intervalStr if baseEvent.Event == nil { baseEvent.Event = &modelpb.Event{} } baseEvent.Event.Outcome = key.EventOutcome baseEvent.Event.SuccessCount = eventSuccessCount if key.ContainerId != "" { if baseEvent.Container == nil { baseEvent.Container = &modelpb.Container{} } baseEvent.Container.Id = key.ContainerId } if key.KubernetesPodName != "" { if baseEvent.Kubernetes == nil { baseEvent.Kubernetes = &modelpb.Kubernetes{} } baseEvent.Kubernetes.PodName = key.KubernetesPodName } if key.ServiceVersion != "" { if baseEvent.Service == nil { baseEvent.Service = &modelpb.Service{} } baseEvent.Service.Version = key.ServiceVersion } if key.ServiceNodeName != "" { if baseEvent.Service == nil { baseEvent.Service = &modelpb.Service{} } if baseEvent.Service.Node == nil { baseEvent.Service.Node = &modelpb.ServiceNode{} } baseEvent.Service.Node.Name = key.ServiceNodeName } if key.ServiceRuntimeName != "" || key.ServiceRuntimeVersion != "" { if baseEvent.Service == nil { baseEvent.Service = &modelpb.Service{} } if baseEvent.Service.Runtime == nil { baseEvent.Service.Runtime = &modelpb.Runtime{} } baseEvent.Service.Runtime.Name = key.ServiceRuntimeName baseEvent.Service.Runtime.Version = key.ServiceRuntimeVersion } if key.ServiceLanguageVersion != "" { if baseEvent.Service == nil { baseEvent.Service = &modelpb.Service{} } if baseEvent.Service.Language == nil { baseEvent.Service.Language = &modelpb.Language{} } baseEvent.Service.Language.Version = key.ServiceLanguageVersion } if key.HostHostname != "" || key.HostName != "" { if baseEvent.Host == nil { baseEvent.Host = &modelpb.Host{} } baseEvent.Host.Hostname = key.HostHostname baseEvent.Host.Name = key.HostName } if key.HostOsPlatform != "" { if baseEvent.Host == nil { baseEvent.Host = &modelpb.Host{} } if baseEvent.Host.Os == nil { baseEvent.Host.Os = &modelpb.OS{} } baseEvent.Host.Os.Platform = key.HostOsPlatform } faasColdstart := nullable.Bool(key.FaasColdstart) if faasColdstart != nullable.Nil || key.FaasId != "" || key.FaasName != "" || key.FaasVersion != "" || key.FaasTriggerType != "" { if baseEvent.Faas == nil { baseEvent.Faas = &modelpb.Faas{} } baseEvent.Faas.ColdStart = faasColdstart.ToBoolPtr() baseEvent.Faas.Id = key.FaasId baseEvent.Faas.Name = key.FaasName baseEvent.Faas.Version = key.FaasVersion baseEvent.Faas.TriggerType = key.FaasTriggerType } if key.CloudProvider != "" || key.CloudRegion != "" || key.CloudAvailabilityZone != "" || key.CloudServiceName != "" || key.CloudAccountId != "" || key.CloudAccountName != "" || key.CloudMachineType != "" || key.CloudProjectId != "" || key.CloudProjectName != "" { if baseEvent.Cloud == nil { baseEvent.Cloud = &modelpb.Cloud{} } baseEvent.Cloud.Provider = key.CloudProvider baseEvent.Cloud.Region = key.CloudRegion baseEvent.Cloud.AvailabilityZone = key.CloudAvailabilityZone baseEvent.Cloud.ServiceName = key.CloudServiceName baseEvent.Cloud.AccountId = key.CloudAccountId baseEvent.Cloud.AccountName = key.CloudAccountName baseEvent.Cloud.MachineType = key.CloudMachineType baseEvent.Cloud.ProjectId = key.CloudProjectId baseEvent.Cloud.ProjectName = key.CloudProjectName } } func svcTxnMetricsToAPMEvent( key *aggregationpb.ServiceTransactionAggregationKey, metrics *aggregationpb.ServiceTransactionMetrics, baseEvent *modelpb.APMEvent, intervalStr string, ) { histogram := hdrhistogram.New() histogramFromProto(histogram, metrics.Histogram) totalCount, counts, values := histogram.Buckets() transactionDurationSummary := modelpb.SummaryMetric{ Count: totalCount, } for i, v := range values { transactionDurationSummary.Sum += v * float64(counts[i]) } if baseEvent.Metricset == nil { baseEvent.Metricset = &modelpb.Metricset{} } baseEvent.Metricset.Name = svcTxnMetricsetName baseEvent.Metricset.DocCount = totalCount baseEvent.Metricset.Interval = intervalStr if baseEvent.Transaction == nil { baseEvent.Transaction = &modelpb.Transaction{} } baseEvent.Transaction.Type = key.TransactionType baseEvent.Transaction.DurationSummary = &transactionDurationSummary if baseEvent.Transaction.DurationHistogram == nil { baseEvent.Transaction.DurationHistogram = &modelpb.Histogram{} } baseEvent.Transaction.DurationHistogram.Counts = counts baseEvent.Transaction.DurationHistogram.Values = values if baseEvent.Event == nil { baseEvent.Event = &modelpb.Event{} } if baseEvent.Event.SuccessCount == nil { baseEvent.Event.SuccessCount = &modelpb.SummaryMetric{} } baseEvent.Event.SuccessCount.Count = uint64(math.Round(metrics.SuccessCount + metrics.FailureCount)) baseEvent.Event.SuccessCount.Sum = math.Round(metrics.SuccessCount) } func spanMetricsToAPMEvent( key *aggregationpb.SpanAggregationKey, metrics *aggregationpb.SpanMetrics, baseEvent *modelpb.APMEvent, intervalStr string, ) { var target *modelpb.ServiceTarget if key.TargetName != "" || key.TargetType != "" { target = &modelpb.ServiceTarget{} target.Type = key.TargetType target.Name = key.TargetName } if baseEvent.Service == nil { baseEvent.Service = &modelpb.Service{} } baseEvent.Service.Target = target if baseEvent.Metricset == nil { baseEvent.Metricset = &modelpb.Metricset{} } baseEvent.Metricset.Name = spanMetricsetName baseEvent.Metricset.DocCount = uint64(math.Round(metrics.Count)) baseEvent.Metricset.Interval = intervalStr if baseEvent.Span == nil { baseEvent.Span = &modelpb.Span{} } baseEvent.Span.Name = key.SpanName if baseEvent.Span.DestinationService == nil { baseEvent.Span.DestinationService = &modelpb.DestinationService{} } baseEvent.Span.DestinationService.Resource = key.Resource if baseEvent.Span.DestinationService.ResponseTime == nil { baseEvent.Span.DestinationService.ResponseTime = &modelpb.AggregatedDuration{} } baseEvent.Span.DestinationService.ResponseTime.Count = uint64(math.Round(metrics.Count)) baseEvent.Span.DestinationService.ResponseTime.Sum = uint64(math.Round(metrics.Sum)) if key.Outcome != "" { if baseEvent.Event == nil { baseEvent.Event = &modelpb.Event{} } baseEvent.Event.Outcome = key.Outcome } } func overflowServiceMetricsToAPMEvent( processingTime time.Time, overflowCount uint64, baseEvent *modelpb.APMEvent, intervalStr string, ) { // Overflow metrics use the processing time as their timestamp rather than // the event time. This makes sure that they can be associated with the // appropriate time when the event volume caused them to overflow. baseEvent.Timestamp = modelpb.FromTime(processingTime) serviceMetricsToAPMEvent(baseEvent, intervalStr) sample := &modelpb.MetricsetSample{} sample.Name = "service_summary.aggregation.overflow_count" sample.Value = float64(overflowCount) if baseEvent.Metricset == nil { baseEvent.Metricset = &modelpb.Metricset{} } baseEvent.Metricset.Samples = append(baseEvent.Metricset.Samples, sample) } // overflowTxnMetricsToAPMEvent maps the fields of overflow // transaction to the passed APMEvent. This only updates transcation // metrics related fields and expects that service related fields // are present in the passed APMEvent. // // For the doc count, unlike the span metrics which uses estimated // overflow count, the transaction metrics uses the value derived // from the histogram to avoid consistency issues between the // overflow estimate and the histogram. func overflowTxnMetricsToAPMEvent( processingTime time.Time, overflowTxn *aggregationpb.TransactionMetrics, overflowCount uint64, baseEvent *modelpb.APMEvent, intervalStr string, ) { // Overflow metrics use the processing time as their timestamp rather than // the event time. This makes sure that they can be associated with the // appropriate time when the event volume caused them to overflow. baseEvent.Timestamp = modelpb.FromTime(processingTime) overflowKey := &aggregationpb.TransactionAggregationKey{ TransactionName: overflowBucketName, } txnMetricsToAPMEvent(overflowKey, overflowTxn, baseEvent, intervalStr) sample := &modelpb.MetricsetSample{} sample.Name = "transaction.aggregation.overflow_count" sample.Value = float64(overflowCount) if baseEvent.Metricset == nil { baseEvent.Metricset = &modelpb.Metricset{} } baseEvent.Metricset.Samples = append(baseEvent.Metricset.Samples, sample) } func overflowSvcTxnMetricsToAPMEvent( processingTime time.Time, overflowSvcTxn *aggregationpb.ServiceTransactionMetrics, overflowCount uint64, baseEvent *modelpb.APMEvent, intervalStr string, ) { // Overflow metrics use the processing time as their timestamp rather than // the event time. This makes sure that they can be associated with the // appropriate time when the event volume caused them to overflow. baseEvent.Timestamp = modelpb.FromTime(processingTime) overflowKey := &aggregationpb.ServiceTransactionAggregationKey{ TransactionType: overflowBucketName, } svcTxnMetricsToAPMEvent(overflowKey, overflowSvcTxn, baseEvent, intervalStr) sample := &modelpb.MetricsetSample{} sample.Name = "service_transaction.aggregation.overflow_count" sample.Value = float64(overflowCount) if baseEvent.Metricset == nil { baseEvent.Metricset = &modelpb.Metricset{} } baseEvent.Metricset.Samples = append(baseEvent.Metricset.Samples, sample) } func overflowSpanMetricsToAPMEvent( processingTime time.Time, overflowSpan *aggregationpb.SpanMetrics, overflowCount uint64, baseEvent *modelpb.APMEvent, intervalStr string, ) { // Overflow metrics use the processing time as their timestamp rather than // the event time. This makes sure that they can be associated with the // appropriate time when the event volume caused them to overflow. baseEvent.Timestamp = modelpb.FromTime(processingTime) overflowKey := &aggregationpb.SpanAggregationKey{ TargetName: overflowBucketName, } spanMetricsToAPMEvent(overflowKey, overflowSpan, baseEvent, intervalStr) sample := &modelpb.MetricsetSample{} sample.Name = "service_destination.aggregation.overflow_count" sample.Value = float64(overflowCount) if baseEvent.Metricset == nil { baseEvent.Metricset = &modelpb.Metricset{} } baseEvent.Metricset.Samples = append(baseEvent.Metricset.Samples, sample) baseEvent.Metricset.DocCount = overflowCount } func marshalEventGlobalLabels(e *modelpb.APMEvent) ([]byte, error) { var labelsCnt, numericLabelsCnt int for _, v := range e.Labels { if !v.Global { continue } labelsCnt++ } for _, v := range e.NumericLabels { if !v.Global { continue } numericLabelsCnt++ } if labelsCnt == 0 && numericLabelsCnt == 0 { return nil, nil } pb := &aggregationpb.GlobalLabels{} pb.Labels = slices.Grow(pb.Labels, labelsCnt)[:labelsCnt] pb.NumericLabels = slices.Grow(pb.NumericLabels, numericLabelsCnt)[:numericLabelsCnt] var i int // Keys must be sorted to ensure wire formats are deterministically generated and strings are directly comparable // i.e. Protobuf formats are equal if and only if the structs are equal for k, v := range e.Labels { if !v.Global { continue } if pb.Labels[i] == nil { pb.Labels[i] = &aggregationpb.Label{} } pb.Labels[i].Key = k pb.Labels[i].Value = v.Value pb.Labels[i].Values = slices.Grow(pb.Labels[i].Values, len(v.Values))[:len(v.Values)] copy(pb.Labels[i].Values, v.Values) i++ } sort.Slice(pb.Labels, func(i, j int) bool { return pb.Labels[i].Key < pb.Labels[j].Key }) i = 0 for k, v := range e.NumericLabels { if !v.Global { continue } if pb.NumericLabels[i] == nil { pb.NumericLabels[i] = &aggregationpb.NumericLabel{} } pb.NumericLabels[i].Key = k pb.NumericLabels[i].Value = v.Value pb.NumericLabels[i].Values = slices.Grow(pb.NumericLabels[i].Values, len(v.Values))[:len(v.Values)] copy(pb.NumericLabels[i].Values, v.Values) i++ } sort.Slice(pb.NumericLabels, func(i, j int) bool { return pb.NumericLabels[i].Key < pb.NumericLabels[j].Key }) return pb.MarshalVT() } func setTransactionKey(e *modelpb.APMEvent, key *aggregationpb.TransactionAggregationKey) { var faasColdstart nullable.Bool faas := e.GetFaas() if faas != nil { faasColdstart.ParseBoolPtr(faas.ColdStart) } key.TraceRoot = e.GetParentId() == "" key.ContainerId = e.GetContainer().GetId() key.KubernetesPodName = e.GetKubernetes().GetPodName() key.ServiceVersion = e.GetService().GetVersion() key.ServiceNodeName = e.GetService().GetNode().GetName() key.ServiceRuntimeName = e.GetService().GetRuntime().GetName() key.ServiceRuntimeVersion = e.GetService().GetRuntime().GetVersion() key.ServiceLanguageVersion = e.GetService().GetLanguage().GetVersion() key.HostHostname = e.GetHost().GetHostname() key.HostName = e.GetHost().GetName() key.HostOsPlatform = e.GetHost().GetOs().GetPlatform() key.EventOutcome = e.GetEvent().GetOutcome() key.TransactionName = e.GetTransaction().GetName() key.TransactionType = e.GetTransaction().GetType() key.TransactionResult = e.GetTransaction().GetResult() key.FaasColdstart = uint32(faasColdstart) key.FaasId = faas.GetId() key.FaasName = faas.GetName() key.FaasVersion = faas.GetVersion() key.FaasTriggerType = faas.GetTriggerType() key.CloudProvider = e.GetCloud().GetProvider() key.CloudRegion = e.GetCloud().GetRegion() key.CloudAvailabilityZone = e.GetCloud().GetAvailabilityZone() key.CloudServiceName = e.GetCloud().GetServiceName() key.CloudAccountId = e.GetCloud().GetAccountId() key.CloudAccountName = e.GetCloud().GetAccountName() key.CloudMachineType = e.GetCloud().GetMachineType() key.CloudProjectId = e.GetCloud().GetProjectId() key.CloudProjectName = e.GetCloud().GetProjectName() } func setServiceTransactionKey(e *modelpb.APMEvent, key *aggregationpb.ServiceTransactionAggregationKey) { key.TransactionType = e.GetTransaction().GetType() } func setSpanKey(e *modelpb.APMEvent, key *aggregationpb.SpanAggregationKey) { var resource, targetType, targetName string target := e.GetService().GetTarget() if target != nil { targetType = target.GetType() targetName = target.GetName() } destSvc := e.GetSpan().GetDestinationService() if destSvc != nil { resource = destSvc.GetResource() } key.SpanName = e.GetSpan().GetName() key.Outcome = e.GetEvent().GetOutcome() key.TargetType = targetType key.TargetName = targetName key.Resource = resource } func setDroppedSpanStatsKey(dss *modelpb.DroppedSpanStats, key *aggregationpb.SpanAggregationKey) { // Dropped span statistics do not contain span name because it // would be too expensive to track dropped span stats per span name. key.Outcome = dss.GetOutcome() key.TargetType = dss.GetServiceTargetType() key.TargetName = dss.GetServiceTargetName() key.Resource = dss.GetDestinationServiceResource() } func formatDuration(d time.Duration) string { if duration := d.Minutes(); duration >= 1 { return fmt.Sprintf("%.0fm", duration) } return fmt.Sprintf("%.0fs", d.Seconds()) }