in processor/lsmintervalprocessor/processor.go [256:355]
func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
v := merger.NewValue(
p.cfg.ResourceLimit,
p.cfg.ScopeLimit,
p.cfg.MetricLimit,
p.cfg.DatapointLimit,
p.cfg.ExponentialHistogramMaxBuckets,
)
var errs []error
nextMD := pmetric.NewMetrics()
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
var nextMDResourceMetrics pmetric.ResourceMetrics
rm := rms.At(i)
sms := rm.ScopeMetrics()
for i := 0; i < sms.Len(); i++ {
var nextMDScopeMetrics pmetric.ScopeMetrics
sm := sms.At(i)
ms := sm.Metrics()
for i := 0; i < ms.Len(); i++ {
m := ms.At(i)
switch t := m.Type(); t {
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge:
// TODO (lahsivjar): implement support for gauges
//
// For now, pass through by copying across to nextMD below.
break
case pmetric.MetricTypeSummary:
if p.cfg.PassThrough.Summary {
// Copy across to nextMD below.
break
}
if err := v.MergeMetric(rm, sm, m); err != nil {
errs = append(errs, err)
}
continue
case pmetric.MetricTypeSum, pmetric.MetricTypeHistogram, pmetric.MetricTypeExponentialHistogram:
if err := v.MergeMetric(rm, sm, m); err != nil {
errs = append(errs, err)
}
continue
default:
// All metric types are handled, this is unexpected
errs = append(errs, fmt.Errorf("unexpected metric type, dropping: %d", t))
continue
}
if nextMDScopeMetrics == (pmetric.ScopeMetrics{}) {
if nextMDResourceMetrics == (pmetric.ResourceMetrics{}) {
nextMDResourceMetrics = nextMD.ResourceMetrics().AppendEmpty()
rm.Resource().CopyTo(nextMDResourceMetrics.Resource())
nextMDResourceMetrics.SetSchemaUrl(rm.SchemaUrl())
}
nextMDScopeMetrics = nextMDResourceMetrics.ScopeMetrics().AppendEmpty()
sm.Scope().CopyTo(nextMDScopeMetrics.Scope())
nextMDScopeMetrics.SetSchemaUrl(sm.SchemaUrl())
}
m.CopyTo(nextMDScopeMetrics.Metrics().AppendEmpty())
}
}
}
mb, ok := p.bufferPool.Get().(*mergeBuffer)
if !ok {
mb = &mergeBuffer{}
}
defer p.bufferPool.Put(mb)
var err error
mb.value, err = v.AppendBinary(mb.value[:0])
if err != nil {
return errors.Join(append(errs, fmt.Errorf("failed to marshal value to proto binary: %w", err))...)
}
clientInfo := client.FromContext(ctx)
clientMetadata := make([]merger.KeyValues, 0, len(p.sortedMetadataKeys))
for _, k := range p.sortedMetadataKeys {
if values := clientInfo.Metadata.Get(k); len(values) != 0 {
clientMetadata = append(clientMetadata, merger.KeyValues{
Key: k,
Values: values,
})
}
}
if err := p.mergeToBatch(mb, clientMetadata); err != nil {
return fmt.Errorf("failed to merge the value to batch: %w", err)
}
// Call next for the metrics remaining in the input
if err := p.next.ConsumeMetrics(ctx, nextMD); err != nil {
errs = append(errs, err)
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}