in collector/receiver/prometheusreceiver/internal/metrics_adjuster.go [268:313]
func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
// By contract metrics will have at least 1 data point, so for sure will have at least one ResourceMetrics.
job, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceName)
if !found {
return errors.New("adjusting metrics without job")
}
instance, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
if !found {
return errors.New("adjusting metrics without instance")
}
tsm := a.jobsMap.get(job.Str(), instance.Str())
// The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that
// nothing else can modify the data used for adjustment.
tsm.Lock()
defer tsm.Unlock()
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
ilm := rm.ScopeMetrics().At(j)
for k := 0; k < ilm.Metrics().Len(); k++ {
metric := ilm.Metrics().At(k)
switch dataType := metric.Type(); dataType {
case pmetric.MetricTypeGauge:
// gauges don't need to be adjusted so no additional processing is necessary
case pmetric.MetricTypeHistogram:
a.adjustMetricHistogram(tsm, metric)
case pmetric.MetricTypeSummary:
a.adjustMetricSummary(tsm, metric)
case pmetric.MetricTypeSum:
a.adjustMetricSum(tsm, metric)
default:
// this shouldn't happen
a.logger.Info("Adjust - skipping unexpected point", zap.String("type", dataType.String()))
}
}
}
}
return nil
}