collector/receiver/prometheusreceiver/internal/transaction.go (220 lines of code) (raw):
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal // import "github.com/GoogleCloudPlatform/run-gmp-sidecar/collector/receiver/prometheusreceiver/internal"
import (
"context"
"errors"
"fmt"
"sort"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
)
const (
targetMetricName = "target_info"
)
type transaction struct {
isNew bool
trimSuffixes bool
ctx context.Context
families map[string]*metricFamily
mc scrape.MetricMetadataStore
sink consumer.Metrics
externalLabels labels.Labels
nodeResource pcommon.Resource
logger *zap.Logger
metricAdjuster MetricsAdjuster
obsrecv *receiverhelper.ObsReport
// Used as buffer to calculate series ref hash.
bufBytes []byte
}
func newTransaction(
ctx context.Context,
metricAdjuster MetricsAdjuster,
sink consumer.Metrics,
externalLabels labels.Labels,
settings receiver.Settings,
obsrecv *receiverhelper.ObsReport,
trimSuffixes bool) *transaction {
return &transaction{
ctx: ctx,
families: make(map[string]*metricFamily),
isNew: true,
trimSuffixes: trimSuffixes,
sink: sink,
metricAdjuster: metricAdjuster,
externalLabels: externalLabels,
logger: settings.Logger,
obsrecv: obsrecv,
bufBytes: make([]byte, 0, 1024),
}
}
// Append always returns 0 to disable label caching.
func (t *transaction) Append(ref storage.SeriesRef, ls labels.Labels, atMs int64, val float64) (storage.SeriesRef, error) {
select {
case <-t.ctx.Done():
return 0, errTransactionAborted
default:
}
if len(t.externalLabels) != 0 {
ls = append(ls, t.externalLabels...)
sort.Sort(ls)
}
if t.isNew {
if err := t.initTransaction(ls); err != nil {
return 0, err
}
}
// Any datapoint with duplicate labels MUST be rejected per:
// * https://github.com/open-telemetry/wg-prometheus/issues/44
// * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
// as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13.
if dupLabel, hasDup := ls.HasDuplicateLabelNames(); hasDup {
return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)
}
metricName := ls.Get(model.MetricNameLabel)
if metricName == "" {
return 0, errMetricNameNotFound
}
// See https://www.prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
// up: 1 if the instance is healthy, i.e. reachable, or 0 if the scrape failed.
// But it can also be a staleNaN, which is inserted when the target goes away.
if metricName == scrapeUpMetricName && val != 1.0 && !value.IsStaleNaN(val) {
if val == 0.0 {
t.logger.Warn("Failed to scrape Prometheus endpoint",
zap.Int64("scrape_timestamp", atMs),
zap.Stringer("target_labels", ls))
} else {
t.logger.Warn("The 'up' metric contains invalid value",
zap.Float64("value", val),
zap.Int64("scrape_timestamp", atMs),
zap.Stringer("target_labels", ls))
}
}
// For the `target_info` metric we need to convert it to resource attributes.
if metricName == targetMetricName {
return 0, t.AddTargetInfo(ls)
}
curMF := t.getOrCreateMetricFamily(metricName)
return 0, curMF.addSeries(t.getSeriesRef(ls, curMF.mtype), metricName, ls, atMs, val)
}
func (t *transaction) getOrCreateMetricFamily(mn string) *metricFamily {
curMf, ok := t.families[mn]
if !ok {
fn := mn
if _, ok := t.mc.GetMetadata(mn); !ok {
fn = normalizeMetricName(mn)
}
if mf, ok := t.families[fn]; ok && mf.includesMetric(mn) {
curMf = mf
} else {
curMf = newMetricFamily(mn, t.mc, t.logger)
t.families[curMf.name] = curMf
}
}
return curMf
}
func (t *transaction) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
select {
case <-t.ctx.Done():
return 0, errTransactionAborted
default:
}
if t.isNew {
if err := t.initTransaction(l); err != nil {
return 0, err
}
}
l = l.WithoutEmpty()
if dupLabel, hasDup := l.HasDuplicateLabelNames(); hasDup {
return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)
}
mn := l.Get(model.MetricNameLabel)
if mn == "" {
return 0, errMetricNameNotFound
}
mf := t.getOrCreateMetricFamily(mn)
mf.addExemplar(t.getSeriesRef(l, mf.mtype), e)
return 0, nil
}
func (t *transaction) AppendHistogram(ref storage.SeriesRef, l labels.Labels, atMs int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
//TODO: implement this func
return 0, nil
}
func (t *transaction) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) {
//TODO: implement this func
return 0, nil
}
func (t *transaction) getSeriesRef(ls labels.Labels, mtype pmetric.MetricType) uint64 {
var hash uint64
hash, t.bufBytes = getSeriesRef(t.bufBytes, ls, mtype)
return hash
}
// getMetrics returns all metrics to the given slice.
// The only error returned by this function is errNoDataToBuild.
func (t *transaction) getMetrics(resource pcommon.Resource) (pmetric.Metrics, error) {
if len(t.families) == 0 {
return pmetric.Metrics{}, errNoDataToBuild
}
md := pmetric.NewMetrics()
rms := md.ResourceMetrics().AppendEmpty()
resource.CopyTo(rms.Resource())
metrics := rms.ScopeMetrics().AppendEmpty().Metrics()
for _, mf := range t.families {
mf.appendMetric(metrics, t.trimSuffixes)
}
return md, nil
}
func (t *transaction) initTransaction(labels labels.Labels) error {
target, ok := scrape.TargetFromContext(t.ctx)
if !ok {
return errors.New("unable to find target in context")
}
t.mc, ok = scrape.MetricMetadataStoreFromContext(t.ctx)
if !ok {
return errors.New("unable to find MetricMetadataStore in context")
}
job, instance := labels.Get(model.JobLabel), labels.Get(model.InstanceLabel)
if job == "" || instance == "" {
return errNoJobInstance
}
t.nodeResource = CreateResource(job, instance, target.DiscoveredLabels())
t.isNew = false
return nil
}
func (t *transaction) Commit() error {
if t.isNew {
return nil
}
ctx := t.obsrecv.StartMetricsOp(t.ctx)
md, err := t.getMetrics(t.nodeResource)
if err != nil {
t.obsrecv.EndMetricsOp(ctx, dataformat, 0, err)
return err
}
numPoints := md.DataPointCount()
if numPoints == 0 {
return nil
}
if err = t.metricAdjuster.AdjustMetrics(md); err != nil {
t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)
return err
}
err = t.sink.ConsumeMetrics(ctx, md)
t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)
return err
}
func (t *transaction) Rollback() error {
return nil
}
func (t *transaction) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
//TODO: implement this func
return 0, nil
}
func (t *transaction) AddTargetInfo(labels labels.Labels) error {
attrs := t.nodeResource.Attributes()
for _, lbl := range labels {
if lbl.Name == model.JobLabel || lbl.Name == model.InstanceLabel || lbl.Name == model.MetricNameLabel {
continue
}
attrs.PutStr(lbl.Name, lbl.Value)
}
return nil
}
func getSeriesRef(bytes []byte, ls labels.Labels, mtype pmetric.MetricType) (uint64, []byte) {
return ls.HashWithoutLabels(bytes, getSortedNotUsefulLabels(mtype)...)
}