receiver/prometheusreceiver/internal/metricfamily.go (475 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
import (
"encoding/hex"
"fmt"
"math"
"sort"
"strings"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/scrape"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)
type metricFamily struct {
mtype pmetric.MetricType
// isMonotonic only applies to sums
isMonotonic bool
groups map[uint64]*metricGroup
name string
metadata *scrape.MetricMetadata
groupOrders []*metricGroup
}
// metricGroup, represents a single metric of a metric family. for example a histogram metric is usually represent by
// a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for
// simple types like counter and gauge, each data point is a group of itself
type metricGroup struct {
mtype pmetric.MetricType
ts int64
ls labels.Labels
count float64
hasCount bool
sum float64
hasSum bool
created float64
value float64
hValue *histogram.Histogram
fhValue *histogram.FloatHistogram
complexValue []*dataPoint
exemplars pmetric.ExemplarSlice
}
func newMetricFamily(metricName string, mc scrape.MetricMetadataStore, logger *zap.Logger) *metricFamily {
metadata, familyName := metadataForMetric(metricName, mc)
mtype, isMonotonic := convToMetricType(metadata.Type)
if mtype == pmetric.MetricTypeEmpty {
logger.Debug(fmt.Sprintf("Unknown-typed metric : %s %+v", metricName, metadata))
}
return &metricFamily{
mtype: mtype,
isMonotonic: isMonotonic,
groups: make(map[uint64]*metricGroup),
name: familyName,
metadata: metadata,
}
}
// includesMetric returns true if the metric is part of the family
func (mf *metricFamily) includesMetric(metricName string) bool {
if mf.mtype != pmetric.MetricTypeGauge {
// If it is a merged family type, then it should match the
// family name when suffixes are trimmed.
return normalizeMetricName(metricName) == mf.name
}
// If it isn't a merged type, the metricName and family name should match
return metricName == mf.name
}
func (mg *metricGroup) sortPoints() {
sort.Slice(mg.complexValue, func(i, j int) bool {
return mg.complexValue[i].boundary < mg.complexValue[j].boundary
})
}
func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice) {
if !mg.hasCount {
return
}
mg.sortPoints()
bucketCount := len(mg.complexValue) + 1
// if the final bucket is +Inf, we ignore it
if bucketCount > 1 && mg.complexValue[bucketCount-2].boundary == math.Inf(1) {
bucketCount--
}
// for OTLP the bounds won't include +inf
bounds := make([]float64, bucketCount-1)
bucketCounts := make([]uint64, bucketCount)
var adjustedCount float64
pointIsStale := value.IsStaleNaN(mg.sum) || value.IsStaleNaN(mg.count)
for i := 0; i < bucketCount-1; i++ {
bounds[i] = mg.complexValue[i].boundary
adjustedCount = mg.complexValue[i].value
// Buckets still need to be sent to know to set them as stale,
// but a staleness NaN converted to uint64 would be an extremely large number.
// Setting to 0 instead.
if pointIsStale {
adjustedCount = 0
} else if i != 0 {
adjustedCount -= mg.complexValue[i-1].value
}
bucketCounts[i] = uint64(adjustedCount)
}
// Add the final bucket based on the total count
adjustedCount = mg.count
if pointIsStale {
adjustedCount = 0
} else if bucketCount > 1 {
adjustedCount -= mg.complexValue[bucketCount-2].value
}
bucketCounts[bucketCount-1] = uint64(adjustedCount)
point := dest.AppendEmpty()
if pointIsStale {
point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
} else {
point.SetCount(uint64(mg.count))
if mg.hasSum {
point.SetSum(mg.sum)
}
}
point.ExplicitBounds().FromRaw(bounds)
point.BucketCounts().FromRaw(bucketCounts)
// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
tsNanos := timestampFromMs(mg.ts)
if mg.created != 0 {
point.SetStartTimestamp(timestampFromFloat64(mg.created))
} else if !removeStartTimeAdjustment.IsEnabled() {
// metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
point.SetStartTimestamp(tsNanos)
}
point.SetTimestamp(tsNanos)
populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes())
mg.setExemplars(point.Exemplars())
}
// toExponentialHistogramDataPoints is based on
// https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#exponential-histograms
func (mg *metricGroup) toExponentialHistogramDataPoints(dest pmetric.ExponentialHistogramDataPointSlice) {
if !mg.hasCount {
return
}
point := dest.AppendEmpty()
point.SetTimestamp(timestampFromMs(mg.ts))
// We do not set Min or Max as native histograms don't have that information.
switch {
case mg.fhValue != nil:
fh := mg.fhValue
if value.IsStaleNaN(fh.Sum) {
point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
// The count and sum are initialized to 0, so we don't need to set them.
} else {
point.SetScale(fh.Schema)
// Input is a float native histogram. This conversion will lose
// precision,but we don't actually expect float histograms in scrape,
// since these are typically the result of operations on integer
// native histograms in the database.
point.SetCount(uint64(fh.Count))
point.SetSum(fh.Sum)
point.SetZeroThreshold(fh.ZeroThreshold)
point.SetZeroCount(uint64(fh.ZeroCount))
if len(fh.PositiveSpans) > 0 {
point.Positive().SetOffset(fh.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
convertAbsoluteBuckets(fh.PositiveSpans, fh.PositiveBuckets, point.Positive().BucketCounts())
}
if len(fh.NegativeSpans) > 0 {
point.Negative().SetOffset(fh.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
convertAbsoluteBuckets(fh.NegativeSpans, fh.NegativeBuckets, point.Negative().BucketCounts())
}
}
case mg.hValue != nil:
h := mg.hValue
if value.IsStaleNaN(h.Sum) {
point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
// The count and sum are initialized to 0, so we don't need to set them.
} else {
point.SetScale(h.Schema)
point.SetCount(h.Count)
point.SetSum(h.Sum)
point.SetZeroThreshold(h.ZeroThreshold)
point.SetZeroCount(h.ZeroCount)
if len(h.PositiveSpans) > 0 {
point.Positive().SetOffset(h.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
convertDeltaBuckets(h.PositiveSpans, h.PositiveBuckets, point.Positive().BucketCounts())
}
if len(h.NegativeSpans) > 0 {
point.Negative().SetOffset(h.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
convertDeltaBuckets(h.NegativeSpans, h.NegativeBuckets, point.Negative().BucketCounts())
}
}
default:
// This should never happen.
return
}
tsNanos := timestampFromMs(mg.ts)
if mg.created != 0 {
point.SetStartTimestamp(timestampFromFloat64(mg.created))
} else if !removeStartTimeAdjustment.IsEnabled() {
// metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
point.SetStartTimestamp(tsNanos)
}
point.SetTimestamp(tsNanos)
populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes())
mg.setExemplars(point.Exemplars())
}
func convertDeltaBuckets(spans []histogram.Span, deltas []int64, buckets pcommon.UInt64Slice) {
buckets.EnsureCapacity(len(deltas))
bucketIdx := 0
bucketCount := int64(0)
for spanIdx, span := range spans {
if spanIdx > 0 {
for i := int32(0); i < span.Offset; i++ {
buckets.Append(uint64(0))
}
}
for i := uint32(0); i < span.Length; i++ {
bucketCount += deltas[bucketIdx]
bucketIdx++
buckets.Append(uint64(bucketCount))
}
}
}
func convertAbsoluteBuckets(spans []histogram.Span, counts []float64, buckets pcommon.UInt64Slice) {
buckets.EnsureCapacity(len(counts))
bucketIdx := 0
for spanIdx, span := range spans {
if spanIdx > 0 {
for i := int32(0); i < span.Offset; i++ {
buckets.Append(uint64(0))
}
}
for i := uint32(0); i < span.Length; i++ {
buckets.Append(uint64(counts[bucketIdx]))
bucketIdx++
}
}
}
func (mg *metricGroup) setExemplars(exemplars pmetric.ExemplarSlice) {
if mg == nil {
return
}
if mg.exemplars.Len() > 0 {
mg.exemplars.MoveAndAppendTo(exemplars)
}
}
func (mg *metricGroup) toSummaryPoint(dest pmetric.SummaryDataPointSlice) {
// expecting count to be provided, however, in the following two cases, they can be missed.
// 1. data is corrupted
// 2. ignored by startValue evaluation
if !mg.hasCount {
return
}
mg.sortPoints()
point := dest.AppendEmpty()
pointIsStale := value.IsStaleNaN(mg.sum) || value.IsStaleNaN(mg.count)
if pointIsStale {
point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
} else {
if mg.hasSum {
point.SetSum(mg.sum)
}
point.SetCount(uint64(mg.count))
}
quantileValues := point.QuantileValues()
for _, p := range mg.complexValue {
quantile := quantileValues.AppendEmpty()
// Quantiles still need to be sent to know to set them as stale,
// but a staleness NaN converted to uint64 would be an extremely large number.
// By not setting the quantile value, it will default to 0.
if !pointIsStale {
quantile.SetValue(p.value)
}
quantile.SetQuantile(p.boundary)
}
// Based on the summary description from https://prometheus.io/docs/concepts/metric_types/#summary
// the quantiles are calculated over a sliding time window, however, the count is the total count of
// observations and the corresponding sum is a sum of all observed values, thus the sum and count used
// at the global level of the metricspb.SummaryValue
// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
tsNanos := timestampFromMs(mg.ts)
point.SetTimestamp(tsNanos)
if mg.created != 0 {
point.SetStartTimestamp(timestampFromFloat64(mg.created))
} else if !removeStartTimeAdjustment.IsEnabled() {
// metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
point.SetStartTimestamp(tsNanos)
}
populateAttributes(pmetric.MetricTypeSummary, mg.ls, point.Attributes())
}
func (mg *metricGroup) toNumberDataPoint(dest pmetric.NumberDataPointSlice) {
tsNanos := timestampFromMs(mg.ts)
point := dest.AppendEmpty()
// gauge/undefined types have no start time.
if mg.mtype == pmetric.MetricTypeSum {
if mg.created != 0 {
point.SetStartTimestamp(timestampFromFloat64(mg.created))
} else if !removeStartTimeAdjustment.IsEnabled() {
// metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
point.SetStartTimestamp(tsNanos)
}
}
point.SetTimestamp(tsNanos)
if value.IsStaleNaN(mg.value) {
point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
} else {
point.SetDoubleValue(mg.value)
}
populateAttributes(pmetric.MetricTypeGauge, mg.ls, point.Attributes())
mg.setExemplars(point.Exemplars())
}
func populateAttributes(mType pmetric.MetricType, ls labels.Labels, dest pcommon.Map) {
dest.EnsureCapacity(ls.Len())
names := getSortedNotUsefulLabels(mType)
j := 0
ls.Range(func(l labels.Label) {
for j < len(names) && names[j] < l.Name {
j++
}
if j < len(names) && l.Name == names[j] {
return
}
if l.Value == "" {
// empty label values should be omitted
return
}
dest.PutStr(l.Name, l.Value)
})
}
func (mf *metricFamily) loadMetricGroupOrCreate(groupKey uint64, ls labels.Labels, ts int64) *metricGroup {
mg, ok := mf.groups[groupKey]
if !ok {
mg = &metricGroup{
mtype: mf.mtype,
ts: ts,
ls: ls,
exemplars: pmetric.NewExemplarSlice(),
}
mf.groups[groupKey] = mg
// maintaining data insertion order is helpful to generate stable/reproducible metric output
mf.groupOrders = append(mf.groupOrders, mg)
}
return mg
}
func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels.Labels, t int64, v float64) error {
mg := mf.loadMetricGroupOrCreate(seriesRef, ls, t)
if mg.ts != t {
return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName)
}
switch mf.mtype {
case pmetric.MetricTypeHistogram, pmetric.MetricTypeSummary:
switch {
case strings.HasSuffix(metricName, metricsSuffixSum):
mg.sum = v
mg.hasSum = true
case strings.HasSuffix(metricName, metricsSuffixCount):
// always use the timestamp from count, because is the only required field for histograms and summaries.
mg.ts = t
mg.count = v
mg.hasCount = true
case metricName == mf.metadata.Metric+metricSuffixCreated:
mg.created = v
default:
boundary, err := getBoundary(mf.mtype, ls)
if err != nil {
return err
}
mg.complexValue = append(mg.complexValue, &dataPoint{value: v, boundary: boundary})
}
case pmetric.MetricTypeExponentialHistogram:
if metricName == mf.metadata.Metric+metricSuffixCreated {
mg.created = v
}
case pmetric.MetricTypeSum:
if metricName == mf.metadata.Metric+metricSuffixCreated {
mg.created = v
} else {
mg.value = v
}
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge:
fallthrough
default:
mg.value = v
}
return nil
}
func (mf *metricFamily) addCreationTimestamp(seriesRef uint64, ls labels.Labels, atMs, created int64) {
mg := mf.loadMetricGroupOrCreate(seriesRef, ls, atMs)
mg.created = float64(created)
}
func (mf *metricFamily) addExponentialHistogramSeries(seriesRef uint64, metricName string, ls labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) error {
mg := mf.loadMetricGroupOrCreate(seriesRef, ls, t)
if mg.ts != t {
return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName)
}
if mg.mtype != pmetric.MetricTypeExponentialHistogram {
return fmt.Errorf("metric type mismatch for exponential histogram metric %v type %s", metricName, mg.mtype.String())
}
switch {
case fh != nil:
if mg.hValue != nil {
return fmt.Errorf("exponential histogram %v already has float counts", metricName)
}
mg.count = fh.Count
mg.sum = fh.Sum
mg.hasCount = true
mg.hasSum = true
mg.fhValue = fh
case h != nil:
if mg.fhValue != nil {
return fmt.Errorf("exponential histogram %v already has integer counts", metricName)
}
mg.count = float64(h.Count)
mg.sum = h.Sum
mg.hasCount = true
mg.hasSum = true
mg.hValue = h
}
return nil
}
func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, trimSuffixes bool) {
metric := pmetric.NewMetric()
// Trims type and unit suffixes from metric name
name := mf.name
if trimSuffixes {
name = prometheus.TrimPromSuffixes(name, mf.mtype, mf.metadata.Unit)
}
metric.SetName(name)
metric.SetDescription(mf.metadata.Help)
metric.SetUnit(prometheus.UnitWordToUCUM(mf.metadata.Unit))
metric.Metadata().PutStr(prometheus.MetricMetadataTypeKey, string(mf.metadata.Type))
var pointCount int
switch mf.mtype {
case pmetric.MetricTypeHistogram:
histogram := metric.SetEmptyHistogram()
histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
hdpL := histogram.DataPoints()
for _, mg := range mf.groupOrders {
mg.toDistributionPoint(hdpL)
}
pointCount = hdpL.Len()
case pmetric.MetricTypeSummary:
summary := metric.SetEmptySummary()
sdpL := summary.DataPoints()
for _, mg := range mf.groupOrders {
mg.toSummaryPoint(sdpL)
}
pointCount = sdpL.Len()
case pmetric.MetricTypeSum:
sum := metric.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
sum.SetIsMonotonic(mf.isMonotonic)
sdpL := sum.DataPoints()
for _, mg := range mf.groupOrders {
mg.toNumberDataPoint(sdpL)
}
pointCount = sdpL.Len()
case pmetric.MetricTypeExponentialHistogram:
histogram := metric.SetEmptyExponentialHistogram()
histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
hdpL := histogram.DataPoints()
for _, mg := range mf.groupOrders {
mg.toExponentialHistogramDataPoints(hdpL)
}
pointCount = hdpL.Len()
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge:
fallthrough
default: // Everything else should be set to a Gauge.
gauge := metric.SetEmptyGauge()
gdpL := gauge.DataPoints()
for _, mg := range mf.groupOrders {
mg.toNumberDataPoint(gdpL)
}
pointCount = gdpL.Len()
}
if pointCount == 0 {
return
}
metric.MoveTo(metrics.AppendEmpty())
}
func (mf *metricFamily) addExemplar(seriesRef uint64, e exemplar.Exemplar) {
mg := mf.groups[seriesRef]
if mg == nil {
return
}
es := mg.exemplars
convertExemplar(e, es.AppendEmpty())
}
func convertExemplar(pe exemplar.Exemplar, e pmetric.Exemplar) {
e.SetTimestamp(timestampFromMs(pe.Ts))
e.SetDoubleValue(pe.Value)
e.FilteredAttributes().EnsureCapacity(pe.Labels.Len())
pe.Labels.Range(func(lb labels.Label) {
switch strings.ToLower(lb.Name) {
case prometheus.ExemplarTraceIDKey:
var tid [16]byte
err := decodeAndCopyToLowerBytes(tid[:], []byte(lb.Value))
if err == nil {
e.SetTraceID(tid)
} else {
e.FilteredAttributes().PutStr(lb.Name, lb.Value)
}
case prometheus.ExemplarSpanIDKey:
var sid [8]byte
err := decodeAndCopyToLowerBytes(sid[:], []byte(lb.Value))
if err == nil {
e.SetSpanID(sid)
} else {
e.FilteredAttributes().PutStr(lb.Name, lb.Value)
}
default:
e.FilteredAttributes().PutStr(lb.Name, lb.Value)
}
})
}
/*
decodeAndCopyToLowerBytes copies src to dst on lower bytes instead of higher
1. If len(src) > len(dst) -> copy first len(dst) bytes as it is. Example -> src = []byte{0xab,0xcd,0xef,0xgh,0xij}, dst = [2]byte, result dst = [2]byte{0xab, 0xcd}
2. If len(src) = len(dst) -> copy src to dst as it is
3. If len(src) < len(dst) -> prepend required 0s and then add src to dst. Example -> src = []byte{0xab, 0xcd}, dst = [8]byte, result dst = [8]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd}
*/
func decodeAndCopyToLowerBytes(dst []byte, src []byte) error {
var err error
decodedLen := hex.DecodedLen(len(src))
if decodedLen >= len(dst) {
_, err = hex.Decode(dst, src[:hex.EncodedLen(len(dst))])
} else {
_, err = hex.Decode(dst[len(dst)-decodedLen:], src)
}
return err
}