aggregators/aggregator.go (640 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
// Package aggregators holds the logic for doing the actual aggregation.
package aggregators
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/cockroachdb/pebble/v2"
"github.com/cockroachdb/pebble/v2/vfs"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/elastic/apm-aggregation/aggregationpb"
"github.com/elastic/apm-aggregation/aggregators/internal/telemetry"
"github.com/elastic/apm-data/model/modelpb"
)
const (
dbCommitThresholdBytes = 10 * 1024 * 1024 // commit every 10MB
aggregationIvlKey = "aggregation_interval"
aggregationTypeKey = "aggregation_type"
)
var (
// ErrAggregatorClosed means that aggregator was closed when the
// method was called and thus cannot be processed further.
ErrAggregatorClosed = errors.New("aggregator is closed")
)
// Aggregator represents a LSM based aggregator instance to generate
// aggregated metrics. The metrics aggregated by the aggregator are
// harvested based on the aggregation interval and processed by the
// defined processor. The aggregated metrics are timestamped based
// on when the aggregator is created and the harvest loop. All the
// events collected between call to New and Run are collected in the
// same processing time bucket and thereafter the processing time
// bucket is advanced in factors of aggregation interval.
type Aggregator struct {
db *pebble.DB
writeOptions *pebble.WriteOptions
cfg config
mu sync.Mutex
processingTime time.Time
batch *pebble.Batch
cachedEvents cachedEventsMap
closed chan struct{}
runStopped chan struct{}
metrics *telemetry.Metrics
}
// New returns a new aggregator instance.
//
// Close must be called when the the aggregator is no longer needed.
func New(opts ...Option) (*Aggregator, error) {
cfg, err := newConfig(opts...)
if err != nil {
return nil, fmt.Errorf("failed to create aggregation config: %w", err)
}
pebbleOpts := &pebble.Options{
Merger: &pebble.Merger{
Name: "combined_metrics_merger",
Merge: func(_, value []byte) (pebble.ValueMerger, error) {
merger := combinedMetricsMerger{
limits: cfg.Limits,
constraints: newConstraints(cfg.Limits),
}
var pb aggregationpb.CombinedMetrics
if err := pb.UnmarshalVT(value); err != nil {
return nil, fmt.Errorf("failed to unmarshal metrics: %w", err)
}
merger.merge(&pb)
return &merger, nil
},
},
}
writeOptions := pebble.Sync
if cfg.InMemory {
pebbleOpts.FS = vfs.NewMem()
pebbleOpts.DisableWAL = true
writeOptions = pebble.NoSync
}
pb, err := pebble.Open(cfg.DataDir, pebbleOpts)
if err != nil {
return nil, fmt.Errorf("failed to create pebble db: %w", err)
}
metrics, err := telemetry.NewMetrics(
func() *pebble.Metrics { return pb.Metrics() },
telemetry.WithMeter(cfg.Meter),
)
if err != nil {
return nil, fmt.Errorf("failed to create metrics: %w", err)
}
return &Aggregator{
db: pb,
writeOptions: writeOptions,
cfg: cfg,
processingTime: time.Now().Truncate(cfg.AggregationIntervals[0]),
closed: make(chan struct{}),
metrics: metrics,
}, nil
}
// AggregateBatch aggregates all events in the batch. This function will return
// an error if the aggregator's Run loop has errored or has been explicitly stopped.
// However, it doesn't require aggregator to be running to perform aggregation.
func (a *Aggregator) AggregateBatch(
ctx context.Context,
id [16]byte,
b *modelpb.Batch,
) error {
cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(id)
a.mu.Lock()
defer a.mu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-a.closed:
return ErrAggregatorClosed
default:
}
var (
errs []error
successBytes, failBytes int64
)
cmk := CombinedMetricsKey{ID: id}
for _, ivl := range a.cfg.AggregationIntervals {
cmk.ProcessingTime = a.processingTime.Truncate(ivl)
cmk.Interval = ivl
for _, e := range *b {
bytesIn, err := a.aggregateAPMEvent(ctx, cmk, e)
if err != nil {
errs = append(errs, err)
failBytes += int64(bytesIn)
} else {
successBytes += int64(bytesIn)
}
}
a.cachedEvents.add(ivl, id, float64(len(*b)))
}
var err error
if len(errs) > 0 {
a.metrics.BytesProcessed.Add(context.Background(), failBytes, metric.WithAttributeSet(
attribute.NewSet(append(cmIDAttrs, telemetry.WithFailure())...),
))
err = fmt.Errorf("failed batch aggregation:\n%w", errors.Join(errs...))
}
a.metrics.BytesProcessed.Add(context.Background(), successBytes, metric.WithAttributeSet(
attribute.NewSet(append(cmIDAttrs, telemetry.WithSuccess())...),
))
return err
}
// AggregateCombinedMetrics aggregates partial metrics into a bigger aggregate.
// This function will return an error if the aggregator's Run loop has errored
// or has been explicitly stopped. However, it doesn't require aggregator to be
// running to perform aggregation.
func (a *Aggregator) AggregateCombinedMetrics(
ctx context.Context,
cmk CombinedMetricsKey,
cm *aggregationpb.CombinedMetrics,
) error {
cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(cmk.ID)
traceAttrs := append(cmIDAttrs,
attribute.String(aggregationIvlKey, formatDuration(cmk.Interval)),
attribute.String("processing_time", cmk.ProcessingTime.String()),
)
ctx, span := a.cfg.Tracer.Start(ctx, "AggregateCombinedMetrics", trace.WithAttributes(traceAttrs...))
defer span.End()
a.mu.Lock()
defer a.mu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-a.closed:
return ErrAggregatorClosed
default:
}
if cmk.ProcessingTime.Before(a.processingTime.Add(-a.cfg.Lookback)) {
a.metrics.EventsProcessed.Add(
context.Background(), cm.EventsTotal,
metric.WithAttributeSet(attribute.NewSet(
append(a.cfg.CombinedMetricsIDToKVs(cmk.ID),
attribute.String(aggregationIvlKey, formatDuration(cmk.Interval)),
telemetry.WithFailure(),
)...,
)),
)
a.cfg.Logger.Warn(
"received expired combined metrics, dropping silently",
zap.Time("received_processing_time", cmk.ProcessingTime),
zap.Time("current_processing_time", a.processingTime),
)
return nil
}
var attrSetOpt metric.MeasurementOption
bytesIn, err := a.aggregate(ctx, cmk, cm)
if err != nil {
attrSetOpt = metric.WithAttributeSet(
attribute.NewSet(append(cmIDAttrs, telemetry.WithFailure())...),
)
} else {
attrSetOpt = metric.WithAttributeSet(
attribute.NewSet(append(cmIDAttrs, telemetry.WithSuccess())...),
)
}
span.SetAttributes(attribute.Int("bytes_ingested", bytesIn))
a.cachedEvents.add(cmk.Interval, cmk.ID, cm.EventsTotal)
a.metrics.BytesProcessed.Add(context.Background(), int64(bytesIn), attrSetOpt)
return err
}
// Run harvests the aggregated results periodically. For an aggregator,
// Run must be called at-most once.
// - Running more than once will return an error
// - Running after aggregator is stopped will return ErrAggregatorClosed.
func (a *Aggregator) Run(ctx context.Context) error {
a.mu.Lock()
if a.runStopped != nil {
a.mu.Unlock()
return errors.New("aggregator is already running")
}
a.runStopped = make(chan struct{})
a.mu.Unlock()
defer close(a.runStopped)
to := a.processingTime.Add(a.cfg.AggregationIntervals[0])
timer := time.NewTimer(time.Until(to.Add(a.cfg.HarvestDelay)))
defer timer.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-a.closed:
return ErrAggregatorClosed
case <-timer.C:
}
a.mu.Lock()
batch := a.batch
a.batch = nil
a.processingTime = to
cachedEventsStats := a.cachedEvents.loadAndDelete(to)
a.mu.Unlock()
if err := a.commitAndHarvest(ctx, batch, to, cachedEventsStats); err != nil {
a.cfg.Logger.Warn("failed to commit and harvest metrics", zap.Error(err))
}
to = to.Add(a.cfg.AggregationIntervals[0])
timer.Reset(time.Until(to.Add(a.cfg.HarvestDelay)))
}
}
// Close commits and closes any buffered writes, stops any running harvester,
// performs a final harvest, and closes the underlying database.
//
// No further writes may be performed after Close is called, and no further
// harvests will be performed once Close returns.
func (a *Aggregator) Close(ctx context.Context) error {
ctx, span := a.cfg.Tracer.Start(ctx, "Aggregator.Close")
defer span.End()
a.mu.Lock()
defer a.mu.Unlock()
select {
case <-a.closed:
default:
a.cfg.Logger.Info("stopping aggregator")
close(a.closed)
}
if a.runStopped != nil {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled while waiting for run to complete: %w", ctx.Err())
case <-a.runStopped:
}
}
if a.db != nil {
a.cfg.Logger.Info("running final aggregation")
if a.batch != nil {
if err := a.batch.Commit(a.writeOptions); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to commit batch: %w", err)
}
if err := a.batch.Close(); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to close batch: %w", err)
}
a.batch = nil
}
var errs []error
for _, ivl := range a.cfg.AggregationIntervals {
// At any particular time there will be 1 harvest candidate for
// each aggregation interval. We will align the end time and
// process each of these.
//
// TODO (lahsivjar): It is possible to harvest the same
// time multiple times, not an issue but can be optimized.
to := a.processingTime.Truncate(ivl).Add(ivl)
if err := a.harvest(ctx, to, a.cachedEvents.loadAndDelete(to)); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf(
"failed to harvest metrics for interval %s: %w", formatDuration(ivl), err),
)
}
}
if len(errs) > 0 {
return fmt.Errorf("failed while running final harvest: %w", errors.Join(errs...))
}
if err := a.db.Close(); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to close pebble: %w", err)
}
// All future operations are invalid after db is closed
a.db = nil
}
if err := a.metrics.CleanUp(); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to cleanup instrumentation: %w", err)
}
return nil
}
func (a *Aggregator) aggregateAPMEvent(
ctx context.Context,
cmk CombinedMetricsKey,
e *modelpb.APMEvent,
) (int, error) {
var totalBytesIn int
aggregateFunc := func(k CombinedMetricsKey, m *aggregationpb.CombinedMetrics) error {
bytesIn, err := a.aggregate(ctx, k, m)
totalBytesIn += bytesIn
return err
}
err := EventToCombinedMetrics(e, cmk, a.cfg.Partitions, aggregateFunc)
if err != nil {
return 0, fmt.Errorf("failed to aggregate combined metrics: %w", err)
}
return totalBytesIn, nil
}
// aggregate aggregates combined metrics for a given key and returns
// number of bytes ingested along with the error, if any.
func (a *Aggregator) aggregate(
ctx context.Context,
cmk CombinedMetricsKey,
cm *aggregationpb.CombinedMetrics,
) (int, error) {
if a.batch == nil {
// Batch is backed by a sync pool. After each commit we will release the batch
// back to the pool by calling Batch#Close and subsequently acquire a new batch.
a.batch = a.db.NewBatch()
}
op := a.batch.MergeDeferred(cmk.SizeBinary(), cm.SizeVT())
if err := cmk.MarshalBinaryToSizedBuffer(op.Key); err != nil {
return 0, fmt.Errorf("failed to marshal combined metrics key: %w", err)
}
if _, err := cm.MarshalToSizedBufferVT(op.Value); err != nil {
return 0, fmt.Errorf("failed to marshal combined metrics: %w", err)
}
if err := op.Finish(); err != nil {
return 0, fmt.Errorf("failed to finalize merge operation: %w", err)
}
bytesIn := cm.SizeVT()
if a.batch.Len() >= dbCommitThresholdBytes {
if err := a.batch.Commit(a.writeOptions); err != nil {
return bytesIn, fmt.Errorf("failed to commit pebble batch: %w", err)
}
if err := a.batch.Close(); err != nil {
return bytesIn, fmt.Errorf("failed to close pebble batch: %w", err)
}
a.batch = nil
}
return bytesIn, nil
}
func (a *Aggregator) commitAndHarvest(
ctx context.Context,
batch *pebble.Batch,
to time.Time,
cachedEventsStats map[time.Duration]map[[16]byte]float64,
) error {
ctx, span := a.cfg.Tracer.Start(ctx, "commitAndHarvest")
defer span.End()
var errs []error
if batch != nil {
if err := batch.Commit(a.writeOptions); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf("failed to commit batch before harvest: %w", err))
}
if err := batch.Close(); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf("failed to close batch before harvest: %w", err))
}
}
if err := a.harvest(ctx, to, cachedEventsStats); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf("failed to harvest aggregated metrics: %w", err))
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
// harvest collects the mature metrics for all aggregation intervals and
// deletes the entries in db once the metrics are fully harvested. Harvest
// takes an end time denoting the exclusive upper bound for harvesting.
func (a *Aggregator) harvest(
ctx context.Context,
end time.Time,
cachedEventsStats map[time.Duration]map[[16]byte]float64,
) error {
snap := a.db.NewSnapshot()
defer snap.Close()
var errs []error
for _, ivl := range a.cfg.AggregationIntervals {
// Check if the given aggregation interval needs to be harvested now
if end.Truncate(ivl).Equal(end) {
start := end.Add(-ivl).Add(-a.cfg.Lookback)
cmCount, err := a.harvestForInterval(
ctx, snap, start, end, ivl, cachedEventsStats[ivl],
)
if err != nil {
errs = append(errs, fmt.Errorf(
"failed to harvest aggregated metrics for interval %s: %w",
ivl, err,
))
}
a.cfg.Logger.Debug(
"Finished harvesting aggregated metrics",
zap.Int("combined_metrics_successfully_harvested", cmCount),
zap.Duration("aggregation_interval_ns", ivl),
zap.Time("harvested_till(exclusive)", end),
zap.Error(err),
)
}
}
return errors.Join(errs...)
}
// harvestForInterval harvests aggregated metrics for a given interval.
// Returns the number of combined metrics successfully harvested and an
// error. It is possible to have non nil error and greater than 0
// combined metrics if some of the combined metrics failed harvest.
func (a *Aggregator) harvestForInterval(
ctx context.Context,
snap *pebble.Snapshot,
start, end time.Time,
ivl time.Duration,
cachedEventsStats map[[16]byte]float64,
) (int, error) {
from := CombinedMetricsKey{
Interval: ivl,
ProcessingTime: start,
}
to := CombinedMetricsKey{
Interval: ivl,
ProcessingTime: end,
}
lb := make([]byte, CombinedMetricsKeyEncodedSize)
ub := make([]byte, CombinedMetricsKeyEncodedSize)
from.MarshalBinaryToSizedBuffer(lb)
to.MarshalBinaryToSizedBuffer(ub)
iter, err := snap.NewIter(&pebble.IterOptions{
LowerBound: lb,
UpperBound: ub,
KeyTypes: pebble.IterKeyTypePointsOnly,
})
if err != nil {
return 0, fmt.Errorf("failed to create iter: %w", err)
}
defer iter.Close()
var harvestErrs []error
var cmCount int
ivlAttr := attribute.String(aggregationIvlKey, formatDuration(ivl))
hasRangeData := iter.First()
for ; iter.Valid(); iter.Next() {
var cmk CombinedMetricsKey
if err := cmk.UnmarshalBinary(iter.Key()); err != nil {
harvestErrs = append(harvestErrs, fmt.Errorf("failed to unmarshal key: %w", err))
continue
}
harvestStats, err := a.processHarvest(ctx, cmk, iter.Value(), ivl)
if err != nil {
harvestErrs = append(harvestErrs, err)
continue
}
cmCount++
commonAttrsOpt := metric.WithAttributeSet(attribute.NewSet(
append(a.cfg.CombinedMetricsIDToKVs(cmk.ID), ivlAttr)...,
))
// Report the estimated number of overflowed metrics per aggregation interval.
// It is not meaningful to aggregate these across intervals or aggregators,
// as the overflowed aggregation keys may be overlapping sets.
recordMetricsOverflow := func(n uint64, aggregationType string) {
if n == 0 {
return
}
a.metrics.MetricsOverflowed.Add(context.Background(), int64(n), commonAttrsOpt,
metric.WithAttributeSet(attribute.NewSet(
attribute.String(aggregationTypeKey, aggregationType),
)),
)
}
recordMetricsOverflow(harvestStats.servicesOverflowed, "service")
recordMetricsOverflow(harvestStats.transactionsOverflowed, "transaction")
recordMetricsOverflow(harvestStats.serviceTransactionsOverflowed, "service_transaction")
recordMetricsOverflow(harvestStats.spansOverflowed, "service_destination")
// processingDelay is normalized by subtracting aggregation interval and
// harvest delay, both of which are expected delays. Normalization helps
// us to use the lower (higher resolution) range of the histogram for the
// important values. The normalized processingDelay can be negative as a
// result of premature harvest triggered by a stop of the aggregator. The
// negative value is accepted as a good value and recorded in the lower
// histogram buckets.
processingDelay := time.Since(cmk.ProcessingTime).Seconds() -
(ivl.Seconds() + a.cfg.HarvestDelay.Seconds())
// queuedDelay is not explicitly normalized because we want to record the
// full delay. For a healthy deployment, the queued delay would be
// implicitly normalized due to the usage of youngest event timestamp.
// Negative values are possible at edges due to delays in running the
// harvest loop or time sync issues between agents and server.
queuedDelay := time.Since(harvestStats.youngestEventTimestamp).Seconds()
outcomeAttrOpt := metric.WithAttributeSet(attribute.NewSet(
telemetry.WithSuccess()),
)
a.metrics.MinQueuedDelay.Record(context.Background(), queuedDelay, commonAttrsOpt, outcomeAttrOpt)
a.metrics.ProcessingLatency.Record(context.Background(), processingDelay, commonAttrsOpt, outcomeAttrOpt)
// Events harvested have been successfully processed, publish these
// as success. Update the map to keep track of events failed.
a.metrics.EventsProcessed.Add(context.Background(), harvestStats.eventsTotal, commonAttrsOpt, outcomeAttrOpt)
cachedEventsStats[cmk.ID] -= harvestStats.eventsTotal
}
if len(harvestErrs) > 0 {
// Each harvest error represents failed processing of an aggregated metric
err = errors.Join(err, fmt.Errorf(
"failed to process %d out of %d metrics:\n%w",
len(harvestErrs), cmCount, errors.Join(harvestErrs...),
))
}
if hasRangeData {
// DeleteRange will create range tombstones so we only do the operation
// if we identify that there is data in the interval.
if rangeCleanupErr := a.db.DeleteRange(lb, ub, a.writeOptions); rangeCleanupErr != nil {
err = errors.Join(err, fmt.Errorf("failed to delete processed range: %w", rangeCleanupErr))
}
}
// All remaining events in the cached events map should be failed events.
// Record these events with a failure outcome.
for cmID, eventsTotal := range cachedEventsStats {
if eventsTotal == 0 {
continue
}
if eventsTotal < 0 {
fields := append([]zap.Field{
zap.Duration("aggregation_interval_ns", ivl),
zap.Float64("remaining_events", eventsTotal),
}, otelKVsToZapFields(a.cfg.CombinedMetricsIDToKVs(cmID))...)
a.cfg.Logger.Warn(
"unexpectedly failed to harvest all collected events",
fields...,
)
continue
}
attrSetOpt := metric.WithAttributeSet(
attribute.NewSet(append(
a.cfg.CombinedMetricsIDToKVs(cmID),
ivlAttr,
telemetry.WithFailure(),
)...),
)
a.metrics.EventsProcessed.Add(context.Background(), eventsTotal, attrSetOpt)
}
return cmCount, err
}
func (a *Aggregator) processHarvest(
ctx context.Context,
cmk CombinedMetricsKey,
cmb []byte,
aggIvl time.Duration,
) (harvestStats, error) {
var cm aggregationpb.CombinedMetrics
if err := cm.UnmarshalVT(cmb); err != nil {
return harvestStats{}, fmt.Errorf("failed to unmarshal metrics: %w", err)
}
// Processor can mutate the CombinedMetrics, so we cannot rely on the
// CombinedMetrics after Processor is called. Take a snapshot of the
// fields we record if processing succeeds.
hs := harvestStats{
eventsTotal: cm.EventsTotal,
youngestEventTimestamp: modelpb.ToTime(cm.YoungestEventTimestamp),
servicesOverflowed: hllSketchEstimate(cm.OverflowServicesEstimator),
}
overflowLogger := nopLogger
if a.cfg.OverflowLogging {
fields := append([]zap.Field{
zap.Duration("aggregation_interval_ns", aggIvl),
}, otelKVsToZapFields(a.cfg.CombinedMetricsIDToKVs(cmk.ID))...)
overflowLogger = a.cfg.Logger.WithLazy(fields...)
}
hs.addOverflows(&cm, a.cfg.Limits, overflowLogger)
if err := a.cfg.Processor(ctx, cmk, &cm, aggIvl); err != nil {
return harvestStats{}, fmt.Errorf("failed to process combined metrics ID %s: %w", cmk.ID, err)
}
return hs, nil
}
var (
nopLogger = zap.NewNop()
// TODO(carsonip): Update this log message when global labels implementation changes
serviceGroupLimitReachedMessage = fmt.Sprintf(""+
"Service limit reached, new metric documents will be grouped under a dedicated "+
"overflow bucket identified by service name '%s'. "+
"If you are sending global labels that are request-specific (e.g. client IP), it may cause "+
"high cardinality and lead to exhaustion of services.",
overflowBucketName,
)
transactionGroupLimitReachedMessage = "" +
"Transaction group per service limit reached, " + transactionGroupLimitReachedSuffix
overallTransactionGroupLimitReachedMessage = "" +
"Overall transaction group limit reached, " + transactionGroupLimitReachedSuffix
transactionGroupLimitReachedSuffix = fmt.Sprintf(""+
"new metric documents will be grouped under a dedicated bucket identified by transaction name '%s'. "+
"This is typically caused by ineffective transaction grouping, "+
"e.g. by creating many unique transaction names. "+
"If you are using an agent with 'use_path_as_transaction_name' enabled, it may cause "+
"high cardinality. If your agent supports the 'transaction_name_groups' option, setting "+
"that configuration option appropriately, may lead to better results.",
overflowBucketName,
)
serviceTransactionGroupLimitReachedMessage = fmt.Sprintf(""+
"Service transaction group per service limit reached, new metric documents will be grouped "+
"under a dedicated bucket identified by transaction type '%s'.",
overflowBucketName,
)
overallServiceTransactionGroupLimitReachedMessage = fmt.Sprintf(""+
"Overall service transaction group limit reached, new metric documents will be grouped "+
"under a dedicated bucket identified by transaction type '%s'.",
overflowBucketName,
)
spanGroupLimitReachedMessage = fmt.Sprintf(""+
"Span group per service limit reached, new metric documents will be grouped "+
"under a dedicated bucket identified by service target name '%s'.",
overflowBucketName,
)
overallSpanGroupLimitReachedMessage = fmt.Sprintf(""+
"Overall span group limit reached, new metric documents will be grouped "+
"under a dedicated bucket identified by service target name '%s'.",
overflowBucketName,
)
)
type harvestStats struct {
eventsTotal float64
youngestEventTimestamp time.Time
servicesOverflowed uint64
transactionsOverflowed uint64
serviceTransactionsOverflowed uint64
spansOverflowed uint64
}
func (hs *harvestStats) addOverflows(cm *aggregationpb.CombinedMetrics, limits Limits, logger *zap.Logger) {
if hs.servicesOverflowed != 0 {
logger.Warn(serviceGroupLimitReachedMessage, zap.Int("limit", limits.MaxServices))
}
// Flags to indicate the overall limit reached per aggregation type,
// so that they are only logged once.
var loggedOverallTransactionGroupLimitReached bool
var loggedOverallServiceTransactionGroupLimitReached bool
var loggedOverallSpanGroupLimitReached bool
logLimitReached := func(
n, limit int,
serviceKey *aggregationpb.ServiceAggregationKey,
perServiceMessage string,
overallMessage string,
loggedOverallMessage *bool,
) {
if serviceKey == nil {
// serviceKey will be nil for the service overflow,
// which is due to cardinality service keys, not
// metric keys.
return
}
if n >= limit {
logger.Warn(
perServiceMessage,
zap.String("service_name", serviceKey.GetServiceName()),
zap.Int("limit", limit),
)
return
} else if !*loggedOverallMessage {
logger.Warn(overallMessage, zap.Int("limit", limit))
*loggedOverallMessage = true
}
}
addOverflow := func(o *aggregationpb.Overflow, ksm *aggregationpb.KeyedServiceMetrics) {
if o == nil {
return
}
if overflowed := hllSketchEstimate(o.OverflowTransactionsEstimator); overflowed > 0 {
hs.transactionsOverflowed += overflowed
logLimitReached(
len(ksm.GetMetrics().GetTransactionMetrics()),
limits.MaxTransactionGroupsPerService,
ksm.GetKey(),
transactionGroupLimitReachedMessage,
overallTransactionGroupLimitReachedMessage,
&loggedOverallTransactionGroupLimitReached,
)
}
if overflowed := hllSketchEstimate(o.OverflowServiceTransactionsEstimator); overflowed > 0 {
hs.serviceTransactionsOverflowed += overflowed
logLimitReached(
len(ksm.GetMetrics().GetServiceTransactionMetrics()),
limits.MaxServiceTransactionGroupsPerService,
ksm.GetKey(),
serviceTransactionGroupLimitReachedMessage,
overallServiceTransactionGroupLimitReachedMessage,
&loggedOverallServiceTransactionGroupLimitReached,
)
}
if overflowed := hllSketchEstimate(o.OverflowSpansEstimator); overflowed > 0 {
hs.spansOverflowed += overflowed
logLimitReached(
len(ksm.GetMetrics().GetSpanMetrics()),
limits.MaxSpanGroupsPerService,
ksm.GetKey(),
spanGroupLimitReachedMessage,
overallSpanGroupLimitReachedMessage,
&loggedOverallSpanGroupLimitReached,
)
}
}
addOverflow(cm.OverflowServices, nil)
for _, ksm := range cm.ServiceMetrics {
addOverflow(ksm.GetMetrics().GetOverflowGroups(), ksm)
}
}