collector/receiver/prometheusreceiver/internal/starttimemetricadjuster.go (123 lines of code) (raw):
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal // import "github.com/GoogleCloudPlatform/run-gmp-sidecar/collector/receiver/prometheusreceiver/internal"
import (
"errors"
"regexp"
"time"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)
var (
errNoStartTimeMetrics = errors.New("start_time metric is missing")
errNoDataPointsStartTimeMetric = errors.New("start time metric with no data points")
errUnsupportedTypeStartTimeMetric = errors.New("unsupported data type for start time metric")
)
type startTimeMetricAdjuster struct {
startTimeMetricRegex *regexp.Regexp
fallbackStartTime *time.Time
resetPointAdjuster *initialPointAdjuster
logger *zap.Logger
}
// NewStartTimeMetricAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on a start time metric.
func NewStartTimeMetricAdjuster(logger *zap.Logger, gcInterval time.Duration, startTimeMetricRegex *regexp.Regexp, useCollectorStartTimeFallback, allowResets bool) MetricsAdjuster {
// Approximate the start time of the collector and use that as the fallback.
var fallbackStartTime *time.Time
if useCollectorStartTimeFallback {
now := time.Now()
fallbackStartTime = &now
}
var resetPointAdjuster *initialPointAdjuster
if allowResets {
resetPointAdjuster = &initialPointAdjuster{
jobsMap: NewJobsMap(gcInterval),
logger: logger,
useCreatedMetric: false,
usePointTimeForReset: true,
}
}
return &startTimeMetricAdjuster{
startTimeMetricRegex: startTimeMetricRegex,
fallbackStartTime: fallbackStartTime,
resetPointAdjuster: resetPointAdjuster,
logger: logger,
}
}
func (stma *startTimeMetricAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
startTime, err := stma.getStartTime(metrics)
if err != nil {
if stma.fallbackStartTime == nil {
return err
}
stma.logger.Warn("Couldn't get start time for metrics. Using fallback start time.", zap.Error(err))
startTime = float64(stma.fallbackStartTime.Unix())
}
startTimeTs := timestampFromFloat64(startTime)
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
ilm := rm.ScopeMetrics().At(j)
for k := 0; k < ilm.Metrics().Len(); k++ {
metric := ilm.Metrics().At(k)
switch metric.Type() {
case pmetric.MetricTypeGauge:
continue
case pmetric.MetricTypeSum:
dataPoints := metric.Sum().DataPoints()
for l := 0; l < dataPoints.Len(); l++ {
dp := dataPoints.At(l)
dp.SetStartTimestamp(startTimeTs)
}
case pmetric.MetricTypeSummary:
dataPoints := metric.Summary().DataPoints()
for l := 0; l < dataPoints.Len(); l++ {
dp := dataPoints.At(l)
dp.SetStartTimestamp(startTimeTs)
}
case pmetric.MetricTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
for l := 0; l < dataPoints.Len(); l++ {
dp := dataPoints.At(l)
dp.SetStartTimestamp(startTimeTs)
}
default:
stma.logger.Warn("Unknown metric type", zap.String("type", metric.Type().String()))
}
}
}
}
// Handle reset points for cumulative metrics.
if stma.resetPointAdjuster != nil {
return stma.resetPointAdjuster.AdjustMetrics(metrics)
}
return nil
}
func (stma *startTimeMetricAdjuster) getStartTime(metrics pmetric.Metrics) (float64, error) {
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
ilm := rm.ScopeMetrics().At(j)
for k := 0; k < ilm.Metrics().Len(); k++ {
metric := ilm.Metrics().At(k)
if stma.matchStartTimeMetric(metric.Name()) {
switch metric.Type() {
case pmetric.MetricTypeGauge:
if metric.Gauge().DataPoints().Len() == 0 {
return 0.0, errNoDataPointsStartTimeMetric
}
return metric.Gauge().DataPoints().At(0).DoubleValue(), nil
case pmetric.MetricTypeSum:
if metric.Sum().DataPoints().Len() == 0 {
return 0.0, errNoDataPointsStartTimeMetric
}
return metric.Sum().DataPoints().At(0).DoubleValue(), nil
default:
return 0, errUnsupportedTypeStartTimeMetric
}
}
}
}
}
return 0.0, errNoStartTimeMetrics
}
func (stma *startTimeMetricAdjuster) matchStartTimeMetric(metricName string) bool {
if stma.startTimeMetricRegex != nil {
return stma.startTimeMetricRegex.MatchString(metricName)
}
return metricName == startTimeMetricName
}