func()

in aggregators/aggregator.go [475:611]


func (a *Aggregator) harvestForInterval(
	ctx context.Context,
	snap *pebble.Snapshot,
	start, end time.Time,
	ivl time.Duration,
	cachedEventsStats map[[16]byte]float64,
) (int, error) {
	from := CombinedMetricsKey{
		Interval:       ivl,
		ProcessingTime: start,
	}
	to := CombinedMetricsKey{
		Interval:       ivl,
		ProcessingTime: end,
	}
	lb := make([]byte, CombinedMetricsKeyEncodedSize)
	ub := make([]byte, CombinedMetricsKeyEncodedSize)
	from.MarshalBinaryToSizedBuffer(lb)
	to.MarshalBinaryToSizedBuffer(ub)

	iter, err := snap.NewIter(&pebble.IterOptions{
		LowerBound: lb,
		UpperBound: ub,
		KeyTypes:   pebble.IterKeyTypePointsOnly,
	})
	if err != nil {
		return 0, fmt.Errorf("failed to create iter: %w", err)
	}
	defer iter.Close()

	var harvestErrs []error
	var cmCount int
	ivlAttr := attribute.String(aggregationIvlKey, formatDuration(ivl))
	hasRangeData := iter.First()
	for ; iter.Valid(); iter.Next() {
		var cmk CombinedMetricsKey
		if err := cmk.UnmarshalBinary(iter.Key()); err != nil {
			harvestErrs = append(harvestErrs, fmt.Errorf("failed to unmarshal key: %w", err))
			continue
		}
		harvestStats, err := a.processHarvest(ctx, cmk, iter.Value(), ivl)
		if err != nil {
			harvestErrs = append(harvestErrs, err)
			continue
		}
		cmCount++

		commonAttrsOpt := metric.WithAttributeSet(attribute.NewSet(
			append(a.cfg.CombinedMetricsIDToKVs(cmk.ID), ivlAttr)...,
		))

		// Report the estimated number of overflowed metrics per aggregation interval.
		// It is not meaningful to aggregate these across intervals or aggregators,
		// as the overflowed aggregation keys may be overlapping sets.
		recordMetricsOverflow := func(n uint64, aggregationType string) {
			if n == 0 {
				return
			}
			a.metrics.MetricsOverflowed.Add(context.Background(), int64(n), commonAttrsOpt,
				metric.WithAttributeSet(attribute.NewSet(
					attribute.String(aggregationTypeKey, aggregationType),
				)),
			)
		}
		recordMetricsOverflow(harvestStats.servicesOverflowed, "service")
		recordMetricsOverflow(harvestStats.transactionsOverflowed, "transaction")
		recordMetricsOverflow(harvestStats.serviceTransactionsOverflowed, "service_transaction")
		recordMetricsOverflow(harvestStats.spansOverflowed, "service_destination")

		// processingDelay is normalized by subtracting aggregation interval and
		// harvest delay, both of which are expected delays. Normalization helps
		// us to use the lower (higher resolution) range of the histogram for the
		// important values. The normalized processingDelay can be negative as a
		// result of premature harvest triggered by a stop of the aggregator. The
		// negative value is accepted as a good value and recorded in the lower
		// histogram buckets.
		processingDelay := time.Since(cmk.ProcessingTime).Seconds() -
			(ivl.Seconds() + a.cfg.HarvestDelay.Seconds())
		// queuedDelay is not explicitly normalized because we want to record the
		// full delay. For a healthy deployment, the queued delay would be
		// implicitly normalized due to the usage of youngest event timestamp.
		// Negative values are possible at edges due to delays in running the
		// harvest loop or time sync issues between agents and server.
		queuedDelay := time.Since(harvestStats.youngestEventTimestamp).Seconds()
		outcomeAttrOpt := metric.WithAttributeSet(attribute.NewSet(
			telemetry.WithSuccess()),
		)
		a.metrics.MinQueuedDelay.Record(context.Background(), queuedDelay, commonAttrsOpt, outcomeAttrOpt)
		a.metrics.ProcessingLatency.Record(context.Background(), processingDelay, commonAttrsOpt, outcomeAttrOpt)
		// Events harvested have been successfully processed, publish these
		// as success. Update the map to keep track of events failed.
		a.metrics.EventsProcessed.Add(context.Background(), harvestStats.eventsTotal, commonAttrsOpt, outcomeAttrOpt)
		cachedEventsStats[cmk.ID] -= harvestStats.eventsTotal
	}
	if len(harvestErrs) > 0 {
		// Each harvest error represents failed processing of an aggregated metric
		err = errors.Join(err, fmt.Errorf(
			"failed to process %d out of %d metrics:\n%w",
			len(harvestErrs), cmCount, errors.Join(harvestErrs...),
		))
	}
	if hasRangeData {
		// DeleteRange will create range tombstones so we only do the operation
		// if we identify that there is data in the interval.
		if rangeCleanupErr := a.db.DeleteRange(lb, ub, a.writeOptions); rangeCleanupErr != nil {
			err = errors.Join(err, fmt.Errorf("failed to delete processed range: %w", rangeCleanupErr))
		}
	}

	// All remaining events in the cached events map should be failed events.
	// Record these events with a failure outcome.
	for cmID, eventsTotal := range cachedEventsStats {
		if eventsTotal == 0 {
			continue
		}
		if eventsTotal < 0 {
			fields := append([]zap.Field{
				zap.Duration("aggregation_interval_ns", ivl),
				zap.Float64("remaining_events", eventsTotal),
			}, otelKVsToZapFields(a.cfg.CombinedMetricsIDToKVs(cmID))...)
			a.cfg.Logger.Warn(
				"unexpectedly failed to harvest all collected events",
				fields...,
			)
			continue
		}
		attrSetOpt := metric.WithAttributeSet(
			attribute.NewSet(append(
				a.cfg.CombinedMetricsIDToKVs(cmID),
				ivlAttr,
				telemetry.WithFailure(),
			)...),
		)
		a.metrics.EventsProcessed.Add(context.Background(), eventsTotal, attrSetOpt)
	}
	return cmCount, err
}