in aggregators/aggregator.go [475:611]
func (a *Aggregator) harvestForInterval(
ctx context.Context,
snap *pebble.Snapshot,
start, end time.Time,
ivl time.Duration,
cachedEventsStats map[[16]byte]float64,
) (int, error) {
from := CombinedMetricsKey{
Interval: ivl,
ProcessingTime: start,
}
to := CombinedMetricsKey{
Interval: ivl,
ProcessingTime: end,
}
lb := make([]byte, CombinedMetricsKeyEncodedSize)
ub := make([]byte, CombinedMetricsKeyEncodedSize)
from.MarshalBinaryToSizedBuffer(lb)
to.MarshalBinaryToSizedBuffer(ub)
iter, err := snap.NewIter(&pebble.IterOptions{
LowerBound: lb,
UpperBound: ub,
KeyTypes: pebble.IterKeyTypePointsOnly,
})
if err != nil {
return 0, fmt.Errorf("failed to create iter: %w", err)
}
defer iter.Close()
var harvestErrs []error
var cmCount int
ivlAttr := attribute.String(aggregationIvlKey, formatDuration(ivl))
hasRangeData := iter.First()
for ; iter.Valid(); iter.Next() {
var cmk CombinedMetricsKey
if err := cmk.UnmarshalBinary(iter.Key()); err != nil {
harvestErrs = append(harvestErrs, fmt.Errorf("failed to unmarshal key: %w", err))
continue
}
harvestStats, err := a.processHarvest(ctx, cmk, iter.Value(), ivl)
if err != nil {
harvestErrs = append(harvestErrs, err)
continue
}
cmCount++
commonAttrsOpt := metric.WithAttributeSet(attribute.NewSet(
append(a.cfg.CombinedMetricsIDToKVs(cmk.ID), ivlAttr)...,
))
// Report the estimated number of overflowed metrics per aggregation interval.
// It is not meaningful to aggregate these across intervals or aggregators,
// as the overflowed aggregation keys may be overlapping sets.
recordMetricsOverflow := func(n uint64, aggregationType string) {
if n == 0 {
return
}
a.metrics.MetricsOverflowed.Add(context.Background(), int64(n), commonAttrsOpt,
metric.WithAttributeSet(attribute.NewSet(
attribute.String(aggregationTypeKey, aggregationType),
)),
)
}
recordMetricsOverflow(harvestStats.servicesOverflowed, "service")
recordMetricsOverflow(harvestStats.transactionsOverflowed, "transaction")
recordMetricsOverflow(harvestStats.serviceTransactionsOverflowed, "service_transaction")
recordMetricsOverflow(harvestStats.spansOverflowed, "service_destination")
// processingDelay is normalized by subtracting aggregation interval and
// harvest delay, both of which are expected delays. Normalization helps
// us to use the lower (higher resolution) range of the histogram for the
// important values. The normalized processingDelay can be negative as a
// result of premature harvest triggered by a stop of the aggregator. The
// negative value is accepted as a good value and recorded in the lower
// histogram buckets.
processingDelay := time.Since(cmk.ProcessingTime).Seconds() -
(ivl.Seconds() + a.cfg.HarvestDelay.Seconds())
// queuedDelay is not explicitly normalized because we want to record the
// full delay. For a healthy deployment, the queued delay would be
// implicitly normalized due to the usage of youngest event timestamp.
// Negative values are possible at edges due to delays in running the
// harvest loop or time sync issues between agents and server.
queuedDelay := time.Since(harvestStats.youngestEventTimestamp).Seconds()
outcomeAttrOpt := metric.WithAttributeSet(attribute.NewSet(
telemetry.WithSuccess()),
)
a.metrics.MinQueuedDelay.Record(context.Background(), queuedDelay, commonAttrsOpt, outcomeAttrOpt)
a.metrics.ProcessingLatency.Record(context.Background(), processingDelay, commonAttrsOpt, outcomeAttrOpt)
// Events harvested have been successfully processed, publish these
// as success. Update the map to keep track of events failed.
a.metrics.EventsProcessed.Add(context.Background(), harvestStats.eventsTotal, commonAttrsOpt, outcomeAttrOpt)
cachedEventsStats[cmk.ID] -= harvestStats.eventsTotal
}
if len(harvestErrs) > 0 {
// Each harvest error represents failed processing of an aggregated metric
err = errors.Join(err, fmt.Errorf(
"failed to process %d out of %d metrics:\n%w",
len(harvestErrs), cmCount, errors.Join(harvestErrs...),
))
}
if hasRangeData {
// DeleteRange will create range tombstones so we only do the operation
// if we identify that there is data in the interval.
if rangeCleanupErr := a.db.DeleteRange(lb, ub, a.writeOptions); rangeCleanupErr != nil {
err = errors.Join(err, fmt.Errorf("failed to delete processed range: %w", rangeCleanupErr))
}
}
// All remaining events in the cached events map should be failed events.
// Record these events with a failure outcome.
for cmID, eventsTotal := range cachedEventsStats {
if eventsTotal == 0 {
continue
}
if eventsTotal < 0 {
fields := append([]zap.Field{
zap.Duration("aggregation_interval_ns", ivl),
zap.Float64("remaining_events", eventsTotal),
}, otelKVsToZapFields(a.cfg.CombinedMetricsIDToKVs(cmID))...)
a.cfg.Logger.Warn(
"unexpectedly failed to harvest all collected events",
fields...,
)
continue
}
attrSetOpt := metric.WithAttributeSet(
attribute.NewSet(append(
a.cfg.CombinedMetricsIDToKVs(cmID),
ivlAttr,
telemetry.WithFailure(),
)...),
)
a.metrics.EventsProcessed.Add(context.Background(), eventsTotal, attrSetOpt)
}
return cmCount, err
}