otelcollector/prometheusreceiver/internal/starttimemetricadjuster.go (135 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
import (
"errors"
"regexp"
"time"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)
var (
errNoStartTimeMetrics = errors.New("start_time metric is missing")
errNoDataPointsStartTimeMetric = errors.New("start time metric with no data points")
errUnsupportedTypeStartTimeMetric = errors.New("unsupported data type for start time metric")
// approximateCollectorStartTime is the approximate start time of the
// collector. Used as a fallback start time for metrics that don't have a
// start time set (when the
// receiver.prometheusreceiver.UseCollectorStartTimeFallback feature gate is
// enabled). Set when the component is initialized.
approximateCollectorStartTime time.Time
)
var useCollectorStartTimeFallbackGate = featuregate.GlobalRegistry().MustRegister(
"receiver.prometheusreceiver.UseCollectorStartTimeFallback",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, the Prometheus receiver's"+
" start time metric adjuster will fallback to using the collector start time"+
" when a start time is not available"),
)
func init() {
approximateCollectorStartTime = time.Now()
}
type startTimeMetricAdjuster struct {
startTimeMetricRegex *regexp.Regexp
resetPointAdjuster *initialPointAdjuster
logger *zap.Logger
}
// NewStartTimeMetricAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on a start time metric.
func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp.Regexp, gcInterval time.Duration) MetricsAdjuster {
resetPointAdjuster := &initialPointAdjuster{
jobsMap: NewJobsMap(gcInterval),
logger: logger,
useCreatedMetric: false,
usePointTimeForReset: true,
}
return &startTimeMetricAdjuster{
startTimeMetricRegex: startTimeMetricRegex,
resetPointAdjuster: resetPointAdjuster,
logger: logger,
}
}
func (stma *startTimeMetricAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
if removeStartTimeAdjustment.IsEnabled() {
return nil
}
startTime, err := stma.getStartTime(metrics)
if err != nil {
if !useCollectorStartTimeFallbackGate.IsEnabled() {
return err
}
stma.logger.Info("Couldn't get start time for metrics. Using fallback start time.", zap.Error(err), zap.Time("fallback_start_time", approximateCollectorStartTime))
startTime = float64(approximateCollectorStartTime.Unix())
}
startTimeTs := timestampFromFloat64(startTime)
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
ilm := rm.ScopeMetrics().At(j)
for k := 0; k < ilm.Metrics().Len(); k++ {
metric := ilm.Metrics().At(k)
switch metric.Type() {
case pmetric.MetricTypeGauge:
continue
case pmetric.MetricTypeSum:
dataPoints := metric.Sum().DataPoints()
for l := 0; l < dataPoints.Len(); l++ {
dp := dataPoints.At(l)
dp.SetStartTimestamp(startTimeTs)
}
case pmetric.MetricTypeSummary:
dataPoints := metric.Summary().DataPoints()
for l := 0; l < dataPoints.Len(); l++ {
dp := dataPoints.At(l)
dp.SetStartTimestamp(startTimeTs)
}
case pmetric.MetricTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
for l := 0; l < dataPoints.Len(); l++ {
dp := dataPoints.At(l)
dp.SetStartTimestamp(startTimeTs)
}
case pmetric.MetricTypeExponentialHistogram:
dataPoints := metric.ExponentialHistogram().DataPoints()
for l := 0; l < dataPoints.Len(); l++ {
dp := dataPoints.At(l)
dp.SetStartTimestamp(startTimeTs)
}
case pmetric.MetricTypeEmpty:
fallthrough
default:
stma.logger.Warn("Unknown metric type", zap.String("type", metric.Type().String()))
}
}
}
}
// Handle resets.
return stma.resetPointAdjuster.AdjustMetrics(metrics)
}
func (stma *startTimeMetricAdjuster) getStartTime(metrics pmetric.Metrics) (float64, error) {
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
ilm := rm.ScopeMetrics().At(j)
for k := 0; k < ilm.Metrics().Len(); k++ {
metric := ilm.Metrics().At(k)
if stma.matchStartTimeMetric(metric.Name()) {
switch metric.Type() {
case pmetric.MetricTypeGauge:
if metric.Gauge().DataPoints().Len() == 0 {
return 0.0, errNoDataPointsStartTimeMetric
}
return metric.Gauge().DataPoints().At(0).DoubleValue(), nil
case pmetric.MetricTypeSum:
if metric.Sum().DataPoints().Len() == 0 {
return 0.0, errNoDataPointsStartTimeMetric
}
return metric.Sum().DataPoints().At(0).DoubleValue(), nil
case pmetric.MetricTypeEmpty, pmetric.MetricTypeHistogram, pmetric.MetricTypeExponentialHistogram, pmetric.MetricTypeSummary:
fallthrough
default:
return 0, errUnsupportedTypeStartTimeMetric
}
}
}
}
}
return 0.0, errNoStartTimeMetrics
}
func (stma *startTimeMetricAdjuster) matchStartTimeMetric(metricName string) bool {
if stma.startTimeMetricRegex != nil {
return stma.startTimeMetricRegex.MatchString(metricName)
}
return metricName == startTimeMetricName
}