in aggregators/aggregator.go [706:785]
func (hs *harvestStats) addOverflows(cm *aggregationpb.CombinedMetrics, limits Limits, logger *zap.Logger) {
if hs.servicesOverflowed != 0 {
logger.Warn(serviceGroupLimitReachedMessage, zap.Int("limit", limits.MaxServices))
}
// Flags to indicate the overall limit reached per aggregation type,
// so that they are only logged once.
var loggedOverallTransactionGroupLimitReached bool
var loggedOverallServiceTransactionGroupLimitReached bool
var loggedOverallSpanGroupLimitReached bool
logLimitReached := func(
n, limit int,
serviceKey *aggregationpb.ServiceAggregationKey,
perServiceMessage string,
overallMessage string,
loggedOverallMessage *bool,
) {
if serviceKey == nil {
// serviceKey will be nil for the service overflow,
// which is due to cardinality service keys, not
// metric keys.
return
}
if n >= limit {
logger.Warn(
perServiceMessage,
zap.String("service_name", serviceKey.GetServiceName()),
zap.Int("limit", limit),
)
return
} else if !*loggedOverallMessage {
logger.Warn(overallMessage, zap.Int("limit", limit))
*loggedOverallMessage = true
}
}
addOverflow := func(o *aggregationpb.Overflow, ksm *aggregationpb.KeyedServiceMetrics) {
if o == nil {
return
}
if overflowed := hllSketchEstimate(o.OverflowTransactionsEstimator); overflowed > 0 {
hs.transactionsOverflowed += overflowed
logLimitReached(
len(ksm.GetMetrics().GetTransactionMetrics()),
limits.MaxTransactionGroupsPerService,
ksm.GetKey(),
transactionGroupLimitReachedMessage,
overallTransactionGroupLimitReachedMessage,
&loggedOverallTransactionGroupLimitReached,
)
}
if overflowed := hllSketchEstimate(o.OverflowServiceTransactionsEstimator); overflowed > 0 {
hs.serviceTransactionsOverflowed += overflowed
logLimitReached(
len(ksm.GetMetrics().GetServiceTransactionMetrics()),
limits.MaxServiceTransactionGroupsPerService,
ksm.GetKey(),
serviceTransactionGroupLimitReachedMessage,
overallServiceTransactionGroupLimitReachedMessage,
&loggedOverallServiceTransactionGroupLimitReached,
)
}
if overflowed := hllSketchEstimate(o.OverflowSpansEstimator); overflowed > 0 {
hs.spansOverflowed += overflowed
logLimitReached(
len(ksm.GetMetrics().GetSpanMetrics()),
limits.MaxSpanGroupsPerService,
ksm.GetKey(),
spanGroupLimitReachedMessage,
overallSpanGroupLimitReachedMessage,
&loggedOverallSpanGroupLimitReached,
)
}
}
addOverflow(cm.OverflowServices, nil)
for _, ksm := range cm.ServiceMetrics {
addOverflow(ksm.GetMetrics().GetOverflowGroups(), ksm)
}
}