func()

in otelcollector/prometheusreceiver/internal/metrics_adjuster.go [262:320]


func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
	if removeStartTimeAdjustment.IsEnabled() {
		return nil
	}
	for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
		rm := metrics.ResourceMetrics().At(i)
		_, found := rm.Resource().Attributes().Get(semconv.AttributeServiceName)
		if !found {
			return errors.New("adjusting metrics without job")
		}

		_, found = rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
		if !found {
			return errors.New("adjusting metrics without instance")
		}
	}

	for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
		rm := metrics.ResourceMetrics().At(i)
		job, _ := rm.Resource().Attributes().Get(semconv.AttributeServiceName)
		instance, _ := rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
		tsm := a.jobsMap.get(job.Str(), instance.Str())

		// The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that
		// nothing else can modify the data used for adjustment.
		tsm.Lock()
		for j := 0; j < rm.ScopeMetrics().Len(); j++ {
			ilm := rm.ScopeMetrics().At(j)
			for k := 0; k < ilm.Metrics().Len(); k++ {
				metric := ilm.Metrics().At(k)
				switch dataType := metric.Type(); dataType {
				case pmetric.MetricTypeGauge:
					// gauges don't need to be adjusted so no additional processing is necessary

				case pmetric.MetricTypeHistogram:
					a.adjustMetricHistogram(tsm, metric)

				case pmetric.MetricTypeSummary:
					a.adjustMetricSummary(tsm, metric)

				case pmetric.MetricTypeSum:
					a.adjustMetricSum(tsm, metric)

				case pmetric.MetricTypeExponentialHistogram:
					a.adjustMetricExponentialHistogram(tsm, metric)

				case pmetric.MetricTypeEmpty:
					fallthrough

				default:
					// this shouldn't happen
					a.logger.Info("Adjust - skipping unexpected point", zap.String("type", dataType.String()))
				}
			}
		}
		tsm.Unlock()
	}
	return nil
}