in otelcollector/prometheusreceiver/internal/metrics_adjuster.go [262:320]
func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
if removeStartTimeAdjustment.IsEnabled() {
return nil
}
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
_, found := rm.Resource().Attributes().Get(semconv.AttributeServiceName)
if !found {
return errors.New("adjusting metrics without job")
}
_, found = rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
if !found {
return errors.New("adjusting metrics without instance")
}
}
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
job, _ := rm.Resource().Attributes().Get(semconv.AttributeServiceName)
instance, _ := rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
tsm := a.jobsMap.get(job.Str(), instance.Str())
// The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that
// nothing else can modify the data used for adjustment.
tsm.Lock()
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
ilm := rm.ScopeMetrics().At(j)
for k := 0; k < ilm.Metrics().Len(); k++ {
metric := ilm.Metrics().At(k)
switch dataType := metric.Type(); dataType {
case pmetric.MetricTypeGauge:
// gauges don't need to be adjusted so no additional processing is necessary
case pmetric.MetricTypeHistogram:
a.adjustMetricHistogram(tsm, metric)
case pmetric.MetricTypeSummary:
a.adjustMetricSummary(tsm, metric)
case pmetric.MetricTypeSum:
a.adjustMetricSum(tsm, metric)
case pmetric.MetricTypeExponentialHistogram:
a.adjustMetricExponentialHistogram(tsm, metric)
case pmetric.MetricTypeEmpty:
fallthrough
default:
// this shouldn't happen
a.logger.Info("Adjust - skipping unexpected point", zap.String("type", dataType.String()))
}
}
}
tsm.Unlock()
}
return nil
}