in processor/lsmintervalprocessor/internal/merger/value.go [342:440]
func (s *Value) initLookupTables() {
if s.lookupsInitialized {
return
}
s.lookupsInitialized = true
// If lookup tables are initialized we will need the lookup maps
s.resLookup = make(map[identity.Resource]pdataResourceMetrics)
s.scopeLookup = make(map[identity.Scope]pdataScopeMetrics)
s.metricLookup = make(map[identity.Metric]pdataMetric)
rms := s.source.ResourceMetrics()
if rms.Len() == 0 {
// Nothing to merge
return
}
// Initialize the lookup tables assuming that the limits were respected
// for the marshaled data and unexpected overflow will not happen.
// Initialization is done by directly accessing the map and without
// checking overflows to avoid accounting overflows as normal buckets.
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
rmID := identity.OfResource(rm.Resource())
scopeTracker := s.trackers.GetScopeTracker(i)
s.resLookup[rmID] = pdataResourceMetrics{
ResourceMetrics: rm,
scopeTracker: scopeTracker,
}
sms := rm.ScopeMetrics()
for j := 0; j < sms.Len(); j++ {
sm := sms.At(j)
scope := sm.Scope()
smID := identity.OfScope(rmID, scope)
metricTracker := scopeTracker.GetMetricTracker(j)
s.scopeLookup[smID] = pdataScopeMetrics{
ScopeMetrics: sm,
metricTracker: metricTracker,
}
metrics := sm.Metrics()
for k := 0; k < metrics.Len(); k++ {
m := metrics.At(k)
mID := identity.OfMetric(smID, m)
s.metricLookup[mID] = pdataMetric{
Metric: m,
datapointTracker: metricTracker.GetDatapointTracker(k),
}
//exhaustive:enforce
switch m.Type() {
case pmetric.MetricTypeEmpty:
continue
case pmetric.MetricTypeGauge:
// TODO (lahsivjar): implement gauge support
case pmetric.MetricTypeSum:
if s.numberLookup == nil {
s.numberLookup = make(map[identity.Stream]pmetric.NumberDataPoint)
}
dps := m.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
streamID := identity.OfStream(mID, dp)
s.numberLookup[streamID] = dp
}
case pmetric.MetricTypeSummary:
if s.summaryLookup == nil {
s.summaryLookup = make(map[identity.Stream]pmetric.SummaryDataPoint)
}
dps := m.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
streamID := identity.OfStream(mID, dp)
s.summaryLookup[streamID] = dp
}
case pmetric.MetricTypeHistogram:
if s.histoLookup == nil {
s.histoLookup = make(map[identity.Stream]pmetric.HistogramDataPoint)
}
dps := m.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
streamID := identity.OfStream(mID, dp)
s.histoLookup[streamID] = dp
}
case pmetric.MetricTypeExponentialHistogram:
if s.expHistoLookup == nil {
s.expHistoLookup = make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint)
}
dps := m.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
streamID := identity.OfStream(mID, dp)
s.expHistoLookup[streamID] = dp
}
}
}
}
}
}