func()

in processor/lsmintervalprocessor/processor.go [256:355]


func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
	v := merger.NewValue(
		p.cfg.ResourceLimit,
		p.cfg.ScopeLimit,
		p.cfg.MetricLimit,
		p.cfg.DatapointLimit,
		p.cfg.ExponentialHistogramMaxBuckets,
	)

	var errs []error
	nextMD := pmetric.NewMetrics()
	rms := md.ResourceMetrics()
	for i := 0; i < rms.Len(); i++ {
		var nextMDResourceMetrics pmetric.ResourceMetrics
		rm := rms.At(i)
		sms := rm.ScopeMetrics()
		for i := 0; i < sms.Len(); i++ {
			var nextMDScopeMetrics pmetric.ScopeMetrics
			sm := sms.At(i)
			ms := sm.Metrics()
			for i := 0; i < ms.Len(); i++ {
				m := ms.At(i)
				switch t := m.Type(); t {
				case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge:
					// TODO (lahsivjar): implement support for gauges
					//
					// For now, pass through by copying across to nextMD below.
					break
				case pmetric.MetricTypeSummary:
					if p.cfg.PassThrough.Summary {
						// Copy across to nextMD below.
						break
					}
					if err := v.MergeMetric(rm, sm, m); err != nil {
						errs = append(errs, err)
					}
					continue
				case pmetric.MetricTypeSum, pmetric.MetricTypeHistogram, pmetric.MetricTypeExponentialHistogram:
					if err := v.MergeMetric(rm, sm, m); err != nil {
						errs = append(errs, err)
					}
					continue
				default:
					// All metric types are handled, this is unexpected
					errs = append(errs, fmt.Errorf("unexpected metric type, dropping: %d", t))
					continue
				}

				if nextMDScopeMetrics == (pmetric.ScopeMetrics{}) {
					if nextMDResourceMetrics == (pmetric.ResourceMetrics{}) {
						nextMDResourceMetrics = nextMD.ResourceMetrics().AppendEmpty()
						rm.Resource().CopyTo(nextMDResourceMetrics.Resource())
						nextMDResourceMetrics.SetSchemaUrl(rm.SchemaUrl())
					}
					nextMDScopeMetrics = nextMDResourceMetrics.ScopeMetrics().AppendEmpty()
					sm.Scope().CopyTo(nextMDScopeMetrics.Scope())
					nextMDScopeMetrics.SetSchemaUrl(sm.SchemaUrl())
				}
				m.CopyTo(nextMDScopeMetrics.Metrics().AppendEmpty())
			}
		}
	}

	mb, ok := p.bufferPool.Get().(*mergeBuffer)
	if !ok {
		mb = &mergeBuffer{}
	}
	defer p.bufferPool.Put(mb)

	var err error
	mb.value, err = v.AppendBinary(mb.value[:0])
	if err != nil {
		return errors.Join(append(errs, fmt.Errorf("failed to marshal value to proto binary: %w", err))...)
	}

	clientInfo := client.FromContext(ctx)
	clientMetadata := make([]merger.KeyValues, 0, len(p.sortedMetadataKeys))
	for _, k := range p.sortedMetadataKeys {
		if values := clientInfo.Metadata.Get(k); len(values) != 0 {
			clientMetadata = append(clientMetadata, merger.KeyValues{
				Key:    k,
				Values: values,
			})
		}
	}

	if err := p.mergeToBatch(mb, clientMetadata); err != nil {
		return fmt.Errorf("failed to merge the value to batch: %w", err)
	}

	// Call next for the metrics remaining in the input
	if err := p.next.ConsumeMetrics(ctx, nextMD); err != nil {
		errs = append(errs, err)
	}

	if len(errs) > 0 {
		return errors.Join(errs...)
	}
	return nil
}