collector/receiver/prometheusreceiver/internal/starttimemetricadjuster.go (123 lines of code) (raw):

// Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package internal // import "github.com/GoogleCloudPlatform/run-gmp-sidecar/collector/receiver/prometheusreceiver/internal" import ( "errors" "regexp" "time" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) var ( errNoStartTimeMetrics = errors.New("start_time metric is missing") errNoDataPointsStartTimeMetric = errors.New("start time metric with no data points") errUnsupportedTypeStartTimeMetric = errors.New("unsupported data type for start time metric") ) type startTimeMetricAdjuster struct { startTimeMetricRegex *regexp.Regexp fallbackStartTime *time.Time resetPointAdjuster *initialPointAdjuster logger *zap.Logger } // NewStartTimeMetricAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on a start time metric. func NewStartTimeMetricAdjuster(logger *zap.Logger, gcInterval time.Duration, startTimeMetricRegex *regexp.Regexp, useCollectorStartTimeFallback, allowResets bool) MetricsAdjuster { // Approximate the start time of the collector and use that as the fallback. var fallbackStartTime *time.Time if useCollectorStartTimeFallback { now := time.Now() fallbackStartTime = &now } var resetPointAdjuster *initialPointAdjuster if allowResets { resetPointAdjuster = &initialPointAdjuster{ jobsMap: NewJobsMap(gcInterval), logger: logger, useCreatedMetric: false, usePointTimeForReset: true, } } return &startTimeMetricAdjuster{ startTimeMetricRegex: startTimeMetricRegex, fallbackStartTime: fallbackStartTime, resetPointAdjuster: resetPointAdjuster, logger: logger, } } func (stma *startTimeMetricAdjuster) AdjustMetrics(metrics pmetric.Metrics) error { startTime, err := stma.getStartTime(metrics) if err != nil { if stma.fallbackStartTime == nil { return err } stma.logger.Warn("Couldn't get start time for metrics. Using fallback start time.", zap.Error(err)) startTime = float64(stma.fallbackStartTime.Unix()) } startTimeTs := timestampFromFloat64(startTime) for i := 0; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) for j := 0; j < rm.ScopeMetrics().Len(); j++ { ilm := rm.ScopeMetrics().At(j) for k := 0; k < ilm.Metrics().Len(); k++ { metric := ilm.Metrics().At(k) switch metric.Type() { case pmetric.MetricTypeGauge: continue case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() for l := 0; l < dataPoints.Len(); l++ { dp := dataPoints.At(l) dp.SetStartTimestamp(startTimeTs) } case pmetric.MetricTypeSummary: dataPoints := metric.Summary().DataPoints() for l := 0; l < dataPoints.Len(); l++ { dp := dataPoints.At(l) dp.SetStartTimestamp(startTimeTs) } case pmetric.MetricTypeHistogram: dataPoints := metric.Histogram().DataPoints() for l := 0; l < dataPoints.Len(); l++ { dp := dataPoints.At(l) dp.SetStartTimestamp(startTimeTs) } default: stma.logger.Warn("Unknown metric type", zap.String("type", metric.Type().String())) } } } } // Handle reset points for cumulative metrics. if stma.resetPointAdjuster != nil { return stma.resetPointAdjuster.AdjustMetrics(metrics) } return nil } func (stma *startTimeMetricAdjuster) getStartTime(metrics pmetric.Metrics) (float64, error) { for i := 0; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) for j := 0; j < rm.ScopeMetrics().Len(); j++ { ilm := rm.ScopeMetrics().At(j) for k := 0; k < ilm.Metrics().Len(); k++ { metric := ilm.Metrics().At(k) if stma.matchStartTimeMetric(metric.Name()) { switch metric.Type() { case pmetric.MetricTypeGauge: if metric.Gauge().DataPoints().Len() == 0 { return 0.0, errNoDataPointsStartTimeMetric } return metric.Gauge().DataPoints().At(0).DoubleValue(), nil case pmetric.MetricTypeSum: if metric.Sum().DataPoints().Len() == 0 { return 0.0, errNoDataPointsStartTimeMetric } return metric.Sum().DataPoints().At(0).DoubleValue(), nil default: return 0, errUnsupportedTypeStartTimeMetric } } } } } return 0.0, errNoStartTimeMetrics } func (stma *startTimeMetricAdjuster) matchStartTimeMetric(metricName string) bool { if stma.startTimeMetricRegex != nil { return stma.startTimeMetricRegex.MatchString(metricName) } return metricName == startTimeMetricName }