func()

in processor/lsmintervalprocessor/internal/merger/value.go [446:495]


func (s *Value) addResourceMetrics(
	otherRm pmetric.ResourceMetrics,
) (identity.Resource, pdataResourceMetrics, error) {
	resID := identity.OfResource(otherRm.Resource())
	if rm, ok := s.resLookup[resID]; ok {
		return resID, rm, nil
	}
	if s.trackers == nil {
		s.trackers = limits.NewTrackers(
			uint64(s.resourceLimitCfg.MaxCardinality),
			uint64(s.scopeLimitCfg.MaxCardinality),
			uint64(s.metricLimitCfg.MaxCardinality),
			uint64(s.datapointLimitCfg.MaxCardinality),
		)
	}
	if s.trackers.GetResourceTracker().CheckOverflow(resID.Hash) {
		// Overflow, get/prepare an overflow bucket
		overflowResID, err := s.getOverflowResourceIdentity()
		if err != nil {
			return identity.Resource{}, pdataResourceMetrics{}, err
		}
		if rm, ok := s.resLookup[overflowResID]; ok {
			return overflowResID, rm, nil
		}
		overflowRm := s.source.ResourceMetrics().AppendEmpty()
		overflowRm.SetSchemaUrl(otherRm.SchemaUrl())
		if err := decorate(
			overflowRm.Resource().Attributes(),
			s.resourceLimitCfg.Overflow.Attributes,
		); err != nil {
			return identity.Resource{}, pdataResourceMetrics{}, err
		}
		rm := pdataResourceMetrics{
			ResourceMetrics: overflowRm,
			scopeTracker:    s.trackers.NewScopeTracker(),
		}
		s.resLookup[overflowResID] = rm
		return overflowResID, rm, nil
	}
	// Clone it *without* the ScopeMetricsSlice data
	rmOrig := s.source.ResourceMetrics().AppendEmpty()
	rmOrig.SetSchemaUrl(otherRm.SchemaUrl())
	otherRm.Resource().CopyTo(rmOrig.Resource())
	rm := pdataResourceMetrics{
		ResourceMetrics: rmOrig,
		scopeTracker:    s.trackers.NewScopeTracker(),
	}
	s.resLookup[resID] = rm
	return resID, rm, nil
}