aggregators/models.go (184 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package aggregators import ( "time" "github.com/axiomhq/hyperloglog" "github.com/elastic/apm-aggregation/aggregationpb" "github.com/elastic/apm-aggregation/aggregators/nullable" "github.com/elastic/apm-data/model/modelpb" ) // Limits define the aggregation limits. Once the limits are reached // the metrics will overflow into dedicated overflow buckets. type Limits struct { // MaxServices is the limit on the total number of unique services. // A unique service is identified by a unique ServiceAggregationKey. // This limit is shared across all aggregation metrics. MaxServices int // MaxSpanGroups is the limit on total number of unique span groups // across all services. // A unique span group is identified by a unique // ServiceAggregationKey + SpanAggregationKey. MaxSpanGroups int // MaxSpanGroupsPerService is the limit on the total number of unique // span groups within a service. // A unique span group within a service is identified by a unique // SpanAggregationKey. MaxSpanGroupsPerService int // MaxTransactionGroups is the limit on total number of unique // transaction groups across all services. // A unique transaction group is identified by a unique // ServiceAggregationKey + TransactionAggregationKey. MaxTransactionGroups int // MaxTransactionGroupsPerService is the limit on the number of unique // transaction groups within a service. // A unique transaction group within a service is identified by a unique // TransactionAggregationKey. MaxTransactionGroupsPerService int // MaxServiceTransactionGroups is the limit on total number of unique // service transaction groups across all services. // A unique service transaction group is identified by a unique // ServiceAggregationKey + ServiceTransactionAggregationKey. MaxServiceTransactionGroups int // MaxServiceTransactionGroupsPerService is the limit on the number // of unique service transaction groups within a service. // A unique service transaction group within a service is identified // by a unique ServiceTransactionAggregationKey. MaxServiceTransactionGroupsPerService int } // CombinedMetricsKey models the key to store the data in LSM tree. // Each key-value pair represents a set of unique metric for a combined metrics ID. // The processing time used in the key should be rounded to the // duration of aggregation since the zero time. type CombinedMetricsKey struct { Interval time.Duration ProcessingTime time.Time PartitionID uint16 ID [16]byte } // globalLabels is an intermediate struct used to marshal/unmarshal the // provided global labels into a comparable format. The format is used by // pebble db to compare service aggregation keys. type globalLabels struct { Labels modelpb.Labels NumericLabels modelpb.NumericLabels } // combinedMetrics models the value to store the data in LSM tree. // Each unique combined metrics ID stores a combined metrics per aggregation // interval. combinedMetrics encapsulates the aggregated metrics // as well as the overflow metrics. type combinedMetrics struct { Services map[serviceAggregationKey]serviceMetrics // OverflowServices provides a dedicated bucket for collecting // aggregate metrics for all the aggregation groups for all services // that overflowed due to max services limit being reached. OverflowServices overflow // OverflowServicesEstimator estimates the number of unique service // aggregation keys that overflowed due to max services limit. OverflowServicesEstimator *hyperloglog.Sketch // EventsTotal is the total number of individual events, including // all overflows, that were aggregated for this combined metrics. It // is used for internal monitoring purposes and is approximated when // partitioning is enabled. EventsTotal float64 // YoungestEventTimestamp is the youngest event that was aggregated // in the combined metrics based on the received timestamp. YoungestEventTimestamp uint64 } // serviceAggregationKey models the key used to store service specific // aggregation metrics. type serviceAggregationKey struct { Timestamp time.Time ServiceName string ServiceEnvironment string ServiceLanguageName string AgentName string GlobalLabelsStr string } // serviceMetrics models the value to store all the aggregated metrics // for a specific service aggregation key. type serviceMetrics struct { OverflowGroups overflow TransactionGroups map[transactionAggregationKey]*aggregationpb.KeyedTransactionMetrics ServiceTransactionGroups map[serviceTransactionAggregationKey]*aggregationpb.KeyedServiceTransactionMetrics SpanGroups map[spanAggregationKey]*aggregationpb.KeyedSpanMetrics } func insertHash(to **hyperloglog.Sketch, hash uint64) { if *to == nil { *to = hyperloglog.New14() } (*to).InsertHash(hash) } func mergeEstimator(to **hyperloglog.Sketch, from *hyperloglog.Sketch) { if *to == nil { *to = hyperloglog.New14() } // Ignoring returned error here since the error is only returned if // the precision is set outside bounds which is not possible for our case. (*to).Merge(from) } type overflowTransaction struct { Metrics *aggregationpb.TransactionMetrics Estimator *hyperloglog.Sketch } func (o *overflowTransaction) Merge( from *aggregationpb.TransactionMetrics, hash uint64, ) { if o.Metrics == nil { o.Metrics = &aggregationpb.TransactionMetrics{} } mergeTransactionMetrics(o.Metrics, from) insertHash(&o.Estimator, hash) } func (o *overflowTransaction) MergeOverflow(from *overflowTransaction) { if from.Estimator != nil { if o.Metrics == nil { o.Metrics = &aggregationpb.TransactionMetrics{} } mergeTransactionMetrics(o.Metrics, from.Metrics) mergeEstimator(&o.Estimator, from.Estimator) } } func (o *overflowTransaction) Empty() bool { return o.Estimator == nil } type overflowServiceTransaction struct { Metrics *aggregationpb.ServiceTransactionMetrics Estimator *hyperloglog.Sketch } func (o *overflowServiceTransaction) Merge( from *aggregationpb.ServiceTransactionMetrics, hash uint64, ) { if o.Metrics == nil { o.Metrics = &aggregationpb.ServiceTransactionMetrics{} } mergeServiceTransactionMetrics(o.Metrics, from) insertHash(&o.Estimator, hash) } func (o *overflowServiceTransaction) MergeOverflow(from *overflowServiceTransaction) { if from.Estimator != nil { if o.Metrics == nil { o.Metrics = &aggregationpb.ServiceTransactionMetrics{} } mergeServiceTransactionMetrics(o.Metrics, from.Metrics) mergeEstimator(&o.Estimator, from.Estimator) } } func (o *overflowServiceTransaction) Empty() bool { return o.Estimator == nil } type overflowSpan struct { Metrics *aggregationpb.SpanMetrics Estimator *hyperloglog.Sketch } func (o *overflowSpan) Merge( from *aggregationpb.SpanMetrics, hash uint64, ) { if o.Metrics == nil { o.Metrics = &aggregationpb.SpanMetrics{} } mergeSpanMetrics(o.Metrics, from) insertHash(&o.Estimator, hash) } func (o *overflowSpan) MergeOverflow(from *overflowSpan) { if from.Estimator != nil { if o.Metrics == nil { o.Metrics = &aggregationpb.SpanMetrics{} } mergeSpanMetrics(o.Metrics, from.Metrics) mergeEstimator(&o.Estimator, from.Estimator) } } func (o *overflowSpan) Empty() bool { return o.Estimator == nil } // overflow contains transaction and spans overflow metrics and cardinality // estimators for the aggregation group for overflow buckets. type overflow struct { OverflowTransaction overflowTransaction OverflowServiceTransaction overflowServiceTransaction OverflowSpan overflowSpan } // transactionAggregationKey models the key used to store transaction // aggregation metrics. type transactionAggregationKey struct { TraceRoot bool ContainerID string KubernetesPodName string ServiceVersion string ServiceNodeName string ServiceRuntimeName string ServiceRuntimeVersion string ServiceLanguageVersion string HostHostname string HostName string HostOSPlatform string EventOutcome string TransactionName string TransactionType string TransactionResult string FAASColdstart nullable.Bool FAASID string FAASName string FAASVersion string FAASTriggerType string CloudProvider string CloudRegion string CloudAvailabilityZone string CloudServiceName string CloudAccountID string CloudAccountName string CloudMachineType string CloudProjectID string CloudProjectName string } // spanAggregationKey models the key used to store span aggregation metrics. type spanAggregationKey struct { SpanName string Outcome string TargetType string TargetName string Resource string } // serviceTransactionAggregationKey models the key used to store // service transaction aggregation metrics. type serviceTransactionAggregationKey struct { TransactionType string }