func()

in processor/lsmintervalprocessor/processor.go [445:574]


func (p *Processor) exportForInterval(
	ctx context.Context,
	snap *pebble.Snapshot,
	start, end time.Time,
	ivl intervalDef,
) (int, error) {
	var boundsBuffer []byte
	from := merger.Key{Interval: ivl.Duration, ProcessingTime: start}
	boundsBuffer, err := from.AppendBinary(nil)
	if err != nil {
		return 0, fmt.Errorf("failed to encode range: %w", err)
	}
	lb := boundsBuffer[:]

	to := merger.Key{Interval: ivl.Duration, ProcessingTime: end}
	boundsBuffer, err = to.AppendBinary(boundsBuffer)
	if err != nil {
		return 0, fmt.Errorf("failed to encode range: %w", err)
	}
	ub := boundsBuffer[len(lb):]

	iter, err := snap.NewIter(&pebble.IterOptions{
		LowerBound: lb,
		UpperBound: ub,
		KeyTypes:   pebble.IterKeyTypePointsOnly,
	})
	if err != nil {
		return 0, fmt.Errorf("failed to create iterator: %w", err)
	}
	defer iter.Close()

	var errs []error
	var exportedDPCount int
	rangeHasData := iter.First()
	for ; iter.Valid(); iter.Next() {
		v := merger.NewValue(
			p.cfg.ResourceLimit,
			p.cfg.ScopeLimit,
			p.cfg.MetricLimit,
			p.cfg.DatapointLimit,
			p.cfg.ExponentialHistogramMaxBuckets,
		)
		var key merger.Key
		if err := key.Unmarshal(iter.Key()); err != nil {
			errs = append(errs, fmt.Errorf("failed to decode key from database: %w", err))
			continue
		}
		if err := v.Unmarshal(iter.Value()); err != nil {
			errs = append(errs, fmt.Errorf("failed to decode value from database: %w", err))
			continue
		}
		finalMetrics, err := v.Finalize()
		if err != nil {
			errs = append(errs, fmt.Errorf("failed to finalize merged metric: %w", err))
			continue
		}
		resourceMetrics := finalMetrics.ResourceMetrics()
		if ivl.Statements != nil {
			for i := 0; i < resourceMetrics.Len(); i++ {
				res := resourceMetrics.At(i)
				scopeMetrics := res.ScopeMetrics()
				for j := 0; j < scopeMetrics.Len(); j++ {
					scope := scopeMetrics.At(j)
					metrics := scope.Metrics()
					for k := 0; k < metrics.Len(); k++ {
						metric := metrics.At(k)
						executeTransform := func(dp any) {
							dCtx := ottldatapoint.NewTransformContext(dp, metric, metrics, scope.Scope(), res.Resource(), scope, res)
							if err := ivl.Statements.Execute(ctx, dCtx); err != nil {
								errs = append(errs, fmt.Errorf("failed to execute ottl statement for interval %s: %w", ivl.Duration, err))
							}
						}
						// TODO (lahsivjar): add exhaustive:enforce lint rule
						//exhaustive:enforce
						switch metric.Type() {
						case pmetric.MetricTypeGauge:
							dps := metric.Gauge().DataPoints()
							for l := 0; l < dps.Len(); l++ {
								executeTransform(dps.At(l))
							}
						case pmetric.MetricTypeSum:
							dps := metric.Sum().DataPoints()
							for l := 0; l < dps.Len(); l++ {
								executeTransform(dps.At(l))
							}
						case pmetric.MetricTypeSummary:
							dps := metric.Summary().DataPoints()
							for l := 0; l < dps.Len(); l++ {
								executeTransform(dps.At(l))
							}
						case pmetric.MetricTypeHistogram:
							dps := metric.Histogram().DataPoints()
							for l := 0; l < dps.Len(); l++ {
								executeTransform(dps.At(l))
							}
						case pmetric.MetricTypeExponentialHistogram:
							dps := metric.ExponentialHistogram().DataPoints()
							for l := 0; l < dps.Len(); l++ {
								executeTransform(dps.At(l))
							}
						}
					}
				}
			}
		}
		if n := len(key.Metadata); n != 0 {
			metadataMap := make(map[string][]string, n)
			for _, kvs := range key.Metadata {
				metadataMap[kvs.Key] = kvs.Values
			}
			info := client.FromContext(ctx)
			info.Metadata = client.NewMetadata(metadataMap)
			ctx = client.NewContext(ctx, info)
		}
		if err := p.next.ConsumeMetrics(ctx, finalMetrics); err != nil {
			errs = append(errs, fmt.Errorf("failed to consume the decoded value: %w", err))
			continue
		}
		exportedDPCount += finalMetrics.DataPointCount()
	}
	if rangeHasData {
		if err := p.db.DeleteRange(lb, ub, p.wOpts); err != nil {
			errs = append(errs, fmt.Errorf("failed to delete exported entries: %w", err))
		}
	}
	if len(errs) > 0 {
		return exportedDPCount, errors.Join(errs...)
	}
	return exportedDPCount, nil
}