otelcollector/prometheusreceiver/internal/transaction.go (453 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
import (
"context"
"errors"
"fmt"
"math"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
mdata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal/metadata"
)
var removeStartTimeAdjustment = featuregate.GlobalRegistry().MustRegister(
"receiver.prometheusreceiver.RemoveStartTimeAdjustment",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, the Prometheus receiver will"+
" leave the start time unset. Use the new metricstarttime processor instead."),
)
type resourceKey struct {
job string
instance string
}
type transaction struct {
isNew bool
trimSuffixes bool
enableNativeHistograms bool
ctx context.Context
families map[resourceKey]map[scopeID]map[string]*metricFamily
mc scrape.MetricMetadataStore
sink consumer.Metrics
externalLabels labels.Labels
nodeResources map[resourceKey]pcommon.Resource
scopeAttributes map[resourceKey]map[scopeID]pcommon.Map
logger *zap.Logger
buildInfo component.BuildInfo
metricAdjuster MetricsAdjuster
obsrecv *receiverhelper.ObsReport
// Used as buffer to calculate series ref hash.
bufBytes []byte
}
var emptyScopeID scopeID
type scopeID struct {
name string
version string
}
func newTransaction(
ctx context.Context,
metricAdjuster MetricsAdjuster,
sink consumer.Metrics,
externalLabels labels.Labels,
settings receiver.Settings,
obsrecv *receiverhelper.ObsReport,
trimSuffixes bool,
enableNativeHistograms bool,
) *transaction {
return &transaction{
ctx: ctx,
families: make(map[resourceKey]map[scopeID]map[string]*metricFamily),
isNew: true,
trimSuffixes: trimSuffixes,
enableNativeHistograms: enableNativeHistograms,
sink: sink,
metricAdjuster: metricAdjuster,
externalLabels: externalLabels,
logger: settings.Logger,
buildInfo: settings.BuildInfo,
obsrecv: obsrecv,
bufBytes: make([]byte, 0, 1024),
scopeAttributes: make(map[resourceKey]map[scopeID]pcommon.Map),
nodeResources: map[resourceKey]pcommon.Resource{},
}
}
// Append always returns 0 to disable label caching.
func (t *transaction) Append(_ storage.SeriesRef, ls labels.Labels, atMs int64, val float64) (storage.SeriesRef, error) {
select {
case <-t.ctx.Done():
return 0, errTransactionAborted
default:
}
if t.externalLabels.Len() != 0 {
b := labels.NewBuilder(ls)
t.externalLabels.Range(func(l labels.Label) {
b.Set(l.Name, l.Value)
})
ls = b.Labels()
}
rKey, err := t.initTransaction(ls)
if err != nil {
return 0, err
}
// Any datapoint with duplicate labels MUST be rejected per:
// * https://github.com/open-telemetry/wg-prometheus/issues/44
// * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
// as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13.
if dupLabel, hasDup := ls.HasDuplicateLabelNames(); hasDup {
return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)
}
metricName := ls.Get(model.MetricNameLabel)
if metricName == "" {
return 0, errMetricNameNotFound
}
// See https://www.prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
// up: 1 if the instance is healthy, i.e. reachable, or 0 if the scrape failed.
// But it can also be a staleNaN, which is inserted when the target goes away.
if metricName == scrapeUpMetricName && val != 1.0 && !value.IsStaleNaN(val) {
if val == 0.0 {
t.logger.Warn("Failed to scrape Prometheus endpoint",
zap.Int64("scrape_timestamp", atMs),
zap.Stringer("target_labels", ls))
} else {
t.logger.Warn("The 'up' metric contains invalid value",
zap.Float64("value", val),
zap.Int64("scrape_timestamp", atMs),
zap.Stringer("target_labels", ls))
}
}
// For the `target_info` metric we need to convert it to resource attributes.
if metricName == prometheus.TargetInfoMetricName {
t.AddTargetInfo(*rKey, ls)
return 0, nil
}
// For the `otel_scope_info` metric we need to convert it to scope attributes.
if metricName == prometheus.ScopeInfoMetricName {
t.addScopeInfo(*rKey, ls)
return 0, nil
}
curMF, existing := t.getOrCreateMetricFamily(*rKey, getScopeID(ls), metricName)
if t.enableNativeHistograms && curMF.mtype == pmetric.MetricTypeExponentialHistogram {
// If a histogram has both classic and native version, the native histogram is scraped
// first. Getting a float sample for the same series means that `scrape_classic_histogram`
// is set to true in the scrape config. In this case, we should ignore the native histogram.
curMF.mtype = pmetric.MetricTypeHistogram
}
seriesRef := t.getSeriesRef(ls, curMF.mtype)
err = curMF.addSeries(seriesRef, metricName, ls, atMs, val)
if err != nil {
// Handle special case of float sample indicating staleness of native
// histogram. This is similar to how Prometheus handles it, but we
// don't have access to the previous value so we're applying some
// heuristics to figure out if this is native histogram or not.
// The metric type will indicate histogram, but presumably there will be no
// _bucket, _count, _sum suffix or `le` label, which makes addSeries fail
// with errEmptyLeLabel.
if t.enableNativeHistograms && errors.Is(err, errEmptyLeLabel) && !existing && value.IsStaleNaN(val) && curMF.mtype == pmetric.MetricTypeHistogram {
mg := curMF.loadMetricGroupOrCreate(seriesRef, ls, atMs)
curMF.mtype = pmetric.MetricTypeExponentialHistogram
mg.mtype = pmetric.MetricTypeExponentialHistogram
_ = curMF.addExponentialHistogramSeries(seriesRef, metricName, ls, atMs, &histogram.Histogram{Sum: math.Float64frombits(value.StaleNaN)}, nil)
// ignore errors here, this is best effort.
} else {
t.logger.Warn("failed to add datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls))
}
}
return 0, nil // never return errors, as that fails the whole scrape
}
// getOrCreateMetricFamily returns the metric family for the given metric name and scope,
// and true if an existing family was found.
func (t *transaction) getOrCreateMetricFamily(key resourceKey, scope scopeID, mn string) (*metricFamily, bool) {
if _, ok := t.families[key]; !ok {
t.families[key] = make(map[scopeID]map[string]*metricFamily)
}
if _, ok := t.families[key][scope]; !ok {
t.families[key][scope] = make(map[string]*metricFamily)
}
curMf, ok := t.families[key][scope][mn]
if !ok {
fn := mn
if _, ok := t.mc.GetMetadata(mn); !ok {
fn = normalizeMetricName(mn)
}
mf, ok := t.families[key][scope][fn]
if !ok || !mf.includesMetric(mn) {
curMf = newMetricFamily(mn, t.mc, t.logger)
t.families[key][scope][curMf.name] = curMf
return curMf, false
}
curMf = mf
}
return curMf, true
}
func (t *transaction) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
select {
case <-t.ctx.Done():
return 0, errTransactionAborted
default:
}
rKey, err := t.initTransaction(l)
if err != nil {
return 0, err
}
l = l.WithoutEmpty()
if dupLabel, hasDup := l.HasDuplicateLabelNames(); hasDup {
return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)
}
mn := l.Get(model.MetricNameLabel)
if mn == "" {
return 0, errMetricNameNotFound
}
mf, _ := t.getOrCreateMetricFamily(*rKey, getScopeID(l), mn)
mf.addExemplar(t.getSeriesRef(l, mf.mtype), e)
return 0, nil
}
func (t *transaction) AppendHistogram(_ storage.SeriesRef, ls labels.Labels, atMs int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if !t.enableNativeHistograms {
return 0, nil
}
select {
case <-t.ctx.Done():
return 0, errTransactionAborted
default:
}
if t.externalLabels.Len() != 0 {
b := labels.NewBuilder(ls)
t.externalLabels.Range(func(l labels.Label) {
b.Set(l.Name, l.Value)
})
ls = b.Labels()
}
rKey, err := t.initTransaction(ls)
if err != nil {
return 0, err
}
// Any datapoint with duplicate labels MUST be rejected per:
// * https://github.com/open-telemetry/wg-prometheus/issues/44
// * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
// as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13.
if dupLabel, hasDup := ls.HasDuplicateLabelNames(); hasDup {
return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)
}
metricName := ls.Get(model.MetricNameLabel)
if metricName == "" {
return 0, errMetricNameNotFound
}
// The `up`, `target_info`, `otel_scope_info` metrics should never generate native histograms,
// thus we don't check for them here as opposed to the Append function.
curMF, existing := t.getOrCreateMetricFamily(*rKey, getScopeID(ls), metricName)
if !existing {
curMF.mtype = pmetric.MetricTypeExponentialHistogram
} else if curMF.mtype != pmetric.MetricTypeExponentialHistogram {
// Already scraped as classic histogram.
return 0, nil
}
if h != nil && h.CounterResetHint == histogram.GaugeType || fh != nil && fh.CounterResetHint == histogram.GaugeType {
t.logger.Warn("dropping unsupported gauge histogram datapoint", zap.String("metric_name", metricName), zap.Any("labels", ls))
}
err = curMF.addExponentialHistogramSeries(t.getSeriesRef(ls, curMF.mtype), metricName, ls, atMs, h, fh)
if err != nil {
t.logger.Warn("failed to add histogram datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls))
}
return 0, nil // never return errors, as that fails the whole scrape
}
func (t *transaction) AppendCTZeroSample(_ storage.SeriesRef, ls labels.Labels, atMs, ctMs int64) (storage.SeriesRef, error) {
return t.setCreationTimestamp(ls, atMs, ctMs, false)
}
func (t *transaction) AppendHistogramCTZeroSample(_ storage.SeriesRef, ls labels.Labels, atMs, ctMs int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
return t.setCreationTimestamp(ls, atMs, ctMs, true)
}
func (t *transaction) setCreationTimestamp(ls labels.Labels, atMs, ctMs int64, histogram bool) (storage.SeriesRef, error) {
select {
case <-t.ctx.Done():
return 0, errTransactionAborted
default:
}
if t.externalLabels.Len() != 0 {
b := labels.NewBuilder(ls)
t.externalLabels.Range(func(l labels.Label) {
b.Set(l.Name, l.Value)
})
ls = b.Labels()
}
rKey, err := t.initTransaction(ls)
if err != nil {
return 0, err
}
// Any datapoint with duplicate labels MUST be rejected per:
// * https://github.com/open-telemetry/wg-prometheus/issues/44
// * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
// as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13.
if dupLabel, hasDup := ls.HasDuplicateLabelNames(); hasDup {
return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)
}
metricName := ls.Get(model.MetricNameLabel)
if metricName == "" {
return 0, errMetricNameNotFound
}
curMF, existing := t.getOrCreateMetricFamily(*rKey, getScopeID(ls), metricName)
if histogram {
if !existing {
curMF.mtype = pmetric.MetricTypeExponentialHistogram
} else if curMF.mtype != pmetric.MetricTypeExponentialHistogram {
// Already scraped as classic histogram.
return 0, nil
}
}
seriesRef := t.getSeriesRef(ls, curMF.mtype)
curMF.addCreationTimestamp(seriesRef, ls, atMs, ctMs)
return storage.SeriesRef(seriesRef), nil
}
func (t *transaction) SetOptions(_ *storage.AppendOptions) {
// TODO: implement this func
}
func (t *transaction) getSeriesRef(ls labels.Labels, mtype pmetric.MetricType) uint64 {
var hash uint64
hash, t.bufBytes = getSeriesRef(t.bufBytes, ls, mtype)
return hash
}
// getMetrics returns all metrics to the given slice.
// The only error returned by this function is errNoDataToBuild.
func (t *transaction) getMetrics() (pmetric.Metrics, error) {
if len(t.families) == 0 {
return pmetric.Metrics{}, errNoDataToBuild
}
md := pmetric.NewMetrics()
for rKey, families := range t.families {
if len(families) == 0 {
continue
}
resource, ok := t.nodeResources[rKey]
if !ok {
continue
}
rms := md.ResourceMetrics().AppendEmpty()
resource.CopyTo(rms.Resource())
for scope, mfs := range families {
ils := rms.ScopeMetrics().AppendEmpty()
// If metrics don't include otel_scope_name or otel_scope_version
// labels, use the receiver name and version.
if scope == emptyScopeID {
ils.Scope().SetName(mdata.ScopeName)
ils.Scope().SetVersion(t.buildInfo.Version)
} else {
// Otherwise, use the scope that was provided with the metrics.
ils.Scope().SetName(scope.name)
ils.Scope().SetVersion(scope.version)
// If we got an otel_scope_info metric for that scope, get scope
// attributes from it.
if scopeAttributes, ok := t.scopeAttributes[rKey]; ok {
if attributes, ok := scopeAttributes[scope]; ok {
attributes.CopyTo(ils.Scope().Attributes())
}
}
}
metrics := ils.Metrics()
for _, mf := range mfs {
mf.appendMetric(metrics, t.trimSuffixes)
}
}
}
// remove the resource if no metrics were added to avoid returning resources with empty data points
md.ResourceMetrics().RemoveIf(func(metrics pmetric.ResourceMetrics) bool {
if metrics.ScopeMetrics().Len() == 0 {
return true
}
remove := true
for i := 0; i < metrics.ScopeMetrics().Len(); i++ {
if metrics.ScopeMetrics().At(i).Metrics().Len() > 0 {
remove = false
break
}
}
return remove
})
return md, nil
}
func getScopeID(ls labels.Labels) scopeID {
var scope scopeID
ls.Range(func(lbl labels.Label) {
if lbl.Name == prometheus.ScopeNameLabelKey {
scope.name = lbl.Value
}
if lbl.Name == prometheus.ScopeVersionLabelKey {
scope.version = lbl.Value
}
})
return scope
}
func (t *transaction) initTransaction(labels labels.Labels) (*resourceKey, error) {
target, ok := scrape.TargetFromContext(t.ctx)
if !ok {
return nil, errors.New("unable to find target in context")
}
t.mc, ok = scrape.MetricMetadataStoreFromContext(t.ctx)
if !ok {
return nil, errors.New("unable to find MetricMetadataStore in context")
}
rKey, err := t.getJobAndInstance(labels)
if err != nil {
return nil, err
}
if _, ok := t.nodeResources[*rKey]; !ok {
t.nodeResources[*rKey] = CreateResource(rKey.job, rKey.instance, target.DiscoveredLabels())
}
t.isNew = false
return rKey, nil
}
func (t *transaction) getJobAndInstance(labels labels.Labels) (*resourceKey, error) {
// first, try to get job and instance from the labels
job, instance := labels.Get(model.JobLabel), labels.Get(model.InstanceLabel)
if job != "" && instance != "" {
return &resourceKey{
job: job,
instance: instance,
}, nil
}
// if not available in the labels, try to fall back to the scrape job associated
// with the transaction.
// this can be the case for, e.g., aggregated metrics coming from a federate endpoint
// that represent the whole cluster, rather than an individual workload.
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32555 for reference
if target, ok := scrape.TargetFromContext(t.ctx); ok {
if job == "" {
job = target.GetValue(model.JobLabel)
}
if instance == "" {
instance = target.GetValue(model.InstanceLabel)
}
if job != "" && instance != "" {
return &resourceKey{
job: job,
instance: instance,
}, nil
}
}
return nil, errNoJobInstance
}
func (t *transaction) Commit() error {
if t.isNew {
return nil
}
ctx := t.obsrecv.StartMetricsOp(t.ctx)
md, err := t.getMetrics()
if err != nil {
t.obsrecv.EndMetricsOp(ctx, dataformat, 0, err)
return err
}
numPoints := md.DataPointCount()
if numPoints == 0 {
return nil
}
if !removeStartTimeAdjustment.IsEnabled() {
if err = t.metricAdjuster.AdjustMetrics(md); err != nil {
t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)
return err
}
}
err = t.sink.ConsumeMetrics(ctx, md)
t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)
return err
}
func (t *transaction) Rollback() error {
return nil
}
func (t *transaction) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
// TODO: implement this func
return 0, nil
}
func (t *transaction) AddTargetInfo(key resourceKey, ls labels.Labels) {
if resource, ok := t.nodeResources[key]; ok {
attrs := resource.Attributes()
ls.Range(func(lbl labels.Label) {
if lbl.Name == model.JobLabel || lbl.Name == model.InstanceLabel || lbl.Name == model.MetricNameLabel {
return
}
attrs.PutStr(lbl.Name, lbl.Value)
})
}
}
func (t *transaction) addScopeInfo(key resourceKey, ls labels.Labels) {
attrs := pcommon.NewMap()
scope := scopeID{}
ls.Range(func(lbl labels.Label) {
if lbl.Name == model.JobLabel || lbl.Name == model.InstanceLabel || lbl.Name == model.MetricNameLabel {
return
}
if lbl.Name == prometheus.ScopeNameLabelKey {
scope.name = lbl.Value
return
}
if lbl.Name == prometheus.ScopeVersionLabelKey {
scope.version = lbl.Value
return
}
attrs.PutStr(lbl.Name, lbl.Value)
})
if _, ok := t.scopeAttributes[key]; !ok {
t.scopeAttributes[key] = make(map[scopeID]pcommon.Map)
}
t.scopeAttributes[key][scope] = attrs
}
func getSeriesRef(bytes []byte, ls labels.Labels, mtype pmetric.MetricType) (uint64, []byte) {
return ls.HashWithoutLabels(bytes, getSortedNotUsefulLabels(mtype)...)
}