in processor/lsmintervalprocessor/processor.go [445:574]
func (p *Processor) exportForInterval(
ctx context.Context,
snap *pebble.Snapshot,
start, end time.Time,
ivl intervalDef,
) (int, error) {
var boundsBuffer []byte
from := merger.Key{Interval: ivl.Duration, ProcessingTime: start}
boundsBuffer, err := from.AppendBinary(nil)
if err != nil {
return 0, fmt.Errorf("failed to encode range: %w", err)
}
lb := boundsBuffer[:]
to := merger.Key{Interval: ivl.Duration, ProcessingTime: end}
boundsBuffer, err = to.AppendBinary(boundsBuffer)
if err != nil {
return 0, fmt.Errorf("failed to encode range: %w", err)
}
ub := boundsBuffer[len(lb):]
iter, err := snap.NewIter(&pebble.IterOptions{
LowerBound: lb,
UpperBound: ub,
KeyTypes: pebble.IterKeyTypePointsOnly,
})
if err != nil {
return 0, fmt.Errorf("failed to create iterator: %w", err)
}
defer iter.Close()
var errs []error
var exportedDPCount int
rangeHasData := iter.First()
for ; iter.Valid(); iter.Next() {
v := merger.NewValue(
p.cfg.ResourceLimit,
p.cfg.ScopeLimit,
p.cfg.MetricLimit,
p.cfg.DatapointLimit,
p.cfg.ExponentialHistogramMaxBuckets,
)
var key merger.Key
if err := key.Unmarshal(iter.Key()); err != nil {
errs = append(errs, fmt.Errorf("failed to decode key from database: %w", err))
continue
}
if err := v.Unmarshal(iter.Value()); err != nil {
errs = append(errs, fmt.Errorf("failed to decode value from database: %w", err))
continue
}
finalMetrics, err := v.Finalize()
if err != nil {
errs = append(errs, fmt.Errorf("failed to finalize merged metric: %w", err))
continue
}
resourceMetrics := finalMetrics.ResourceMetrics()
if ivl.Statements != nil {
for i := 0; i < resourceMetrics.Len(); i++ {
res := resourceMetrics.At(i)
scopeMetrics := res.ScopeMetrics()
for j := 0; j < scopeMetrics.Len(); j++ {
scope := scopeMetrics.At(j)
metrics := scope.Metrics()
for k := 0; k < metrics.Len(); k++ {
metric := metrics.At(k)
executeTransform := func(dp any) {
dCtx := ottldatapoint.NewTransformContext(dp, metric, metrics, scope.Scope(), res.Resource(), scope, res)
if err := ivl.Statements.Execute(ctx, dCtx); err != nil {
errs = append(errs, fmt.Errorf("failed to execute ottl statement for interval %s: %w", ivl.Duration, err))
}
}
// TODO (lahsivjar): add exhaustive:enforce lint rule
//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge:
dps := metric.Gauge().DataPoints()
for l := 0; l < dps.Len(); l++ {
executeTransform(dps.At(l))
}
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
executeTransform(dps.At(l))
}
case pmetric.MetricTypeSummary:
dps := metric.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
executeTransform(dps.At(l))
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
executeTransform(dps.At(l))
}
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
executeTransform(dps.At(l))
}
}
}
}
}
}
if n := len(key.Metadata); n != 0 {
metadataMap := make(map[string][]string, n)
for _, kvs := range key.Metadata {
metadataMap[kvs.Key] = kvs.Values
}
info := client.FromContext(ctx)
info.Metadata = client.NewMetadata(metadataMap)
ctx = client.NewContext(ctx, info)
}
if err := p.next.ConsumeMetrics(ctx, finalMetrics); err != nil {
errs = append(errs, fmt.Errorf("failed to consume the decoded value: %w", err))
continue
}
exportedDPCount += finalMetrics.DataPointCount()
}
if rangeHasData {
if err := p.db.DeleteRange(lb, ub, p.wOpts); err != nil {
errs = append(errs, fmt.Errorf("failed to delete exported entries: %w", err))
}
}
if len(errs) > 0 {
return exportedDPCount, errors.Join(errs...)
}
return exportedDPCount, nil
}