in processor/lsmintervalprocessor/internal/merger/value.go [446:495]
func (s *Value) addResourceMetrics(
otherRm pmetric.ResourceMetrics,
) (identity.Resource, pdataResourceMetrics, error) {
resID := identity.OfResource(otherRm.Resource())
if rm, ok := s.resLookup[resID]; ok {
return resID, rm, nil
}
if s.trackers == nil {
s.trackers = limits.NewTrackers(
uint64(s.resourceLimitCfg.MaxCardinality),
uint64(s.scopeLimitCfg.MaxCardinality),
uint64(s.metricLimitCfg.MaxCardinality),
uint64(s.datapointLimitCfg.MaxCardinality),
)
}
if s.trackers.GetResourceTracker().CheckOverflow(resID.Hash) {
// Overflow, get/prepare an overflow bucket
overflowResID, err := s.getOverflowResourceIdentity()
if err != nil {
return identity.Resource{}, pdataResourceMetrics{}, err
}
if rm, ok := s.resLookup[overflowResID]; ok {
return overflowResID, rm, nil
}
overflowRm := s.source.ResourceMetrics().AppendEmpty()
overflowRm.SetSchemaUrl(otherRm.SchemaUrl())
if err := decorate(
overflowRm.Resource().Attributes(),
s.resourceLimitCfg.Overflow.Attributes,
); err != nil {
return identity.Resource{}, pdataResourceMetrics{}, err
}
rm := pdataResourceMetrics{
ResourceMetrics: overflowRm,
scopeTracker: s.trackers.NewScopeTracker(),
}
s.resLookup[overflowResID] = rm
return overflowResID, rm, nil
}
// Clone it *without* the ScopeMetricsSlice data
rmOrig := s.source.ResourceMetrics().AppendEmpty()
rmOrig.SetSchemaUrl(otherRm.SchemaUrl())
otherRm.Resource().CopyTo(rmOrig.Resource())
rm := pdataResourceMetrics{
ResourceMetrics: rmOrig,
scopeTracker: s.trackers.NewScopeTracker(),
}
s.resLookup[resID] = rm
return resID, rm, nil
}