module/apmprometheus/gatherer.go (102 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apmprometheus // import "go.elastic.co/apm/module/apmprometheus/v2"
import (
"context"
"strconv"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"go.elastic.co/apm/v2"
)
// Wrap returns an apm.MetricsGatherer wrapping g.
func Wrap(g prometheus.Gatherer) apm.MetricsGatherer {
return gatherer{g}
}
type gatherer struct {
p prometheus.Gatherer
}
// GatherMetrics gathers metrics from the prometheus.Gatherer p.g,
// and adds them to out.
func (g gatherer) GatherMetrics(ctx context.Context, out *apm.Metrics) error {
metricFamilies, err := g.p.Gather()
if err != nil {
return errors.WithStack(err)
}
for _, mf := range metricFamilies {
name := mf.GetName()
switch mf.GetType() {
case dto.MetricType_COUNTER:
for _, m := range mf.GetMetric() {
v := m.GetCounter().GetValue()
out.Add(name, makeLabels(m.GetLabel()), v)
}
case dto.MetricType_GAUGE:
metrics := mf.GetMetric()
if name == "go_info" && len(metrics) == 1 && metrics[0].GetGauge().GetValue() == 1 {
// Ignore the "go_info" metric from the
// built-in GoCollector, as we provide
// the same information in the payload.
continue
}
for _, m := range metrics {
v := m.GetGauge().GetValue()
out.Add(name, makeLabels(m.GetLabel()), v)
}
case dto.MetricType_UNTYPED:
for _, m := range mf.GetMetric() {
v := m.GetUntyped().GetValue()
out.Add(name, makeLabels(m.GetLabel()), v)
}
case dto.MetricType_SUMMARY:
for _, m := range mf.GetMetric() {
s := m.GetSummary()
labels := makeLabels(m.GetLabel())
out.Add(name+".count", labels, float64(s.GetSampleCount()))
out.Add(name+".total", labels, float64(s.GetSampleSum()))
for _, q := range s.GetQuantile() {
p := int(q.GetQuantile() * 100)
out.Add(name+".percentile."+strconv.Itoa(p), labels, q.GetValue())
}
}
case dto.MetricType_HISTOGRAM:
// For the bucket values, we follow the approach described by Prometheus's
// histogram_quantile function (https://prometheus.io/docs/prometheus/latest/querying/functions/#histogram_quantile)
// to achieve consistent percentile aggregation results:
//
// "The histogram_quantile() function interpolates quantile values by assuming a linear
// distribution within a bucket. (...) If a quantile is located in the highest bucket,
// the upper bound of the second highest bucket is returned. A lower limit of the lowest
// bucket is assumed to be 0 if the upper bound of that bucket is greater than 0. In that
// case, the usual linear interpolation is applied within that bucket. Otherwise, the upper
// bound of the lowest bucket is returned for quantiles located in the lowest bucket."
for _, m := range mf.GetMetric() {
h := m.GetHistogram()
// Total count for all values in this
// histogram. We want the per value count.
totalCount := h.GetSampleCount()
if totalCount == 0 {
continue
}
labels := makeLabels(m.GetLabel())
values := h.GetBucket()
// The +Inf bucket isn't encoded into the
// protobuf representation, but observations
// that fall within it are reflected in the
// histogram's SampleCount.
// We compare the totalCount to the bucketCount
// (sum of all CumulativeCount()s per bucket)
// to infer if an additional midpoint + count
// need to be added to their respective slices.
var bucketCount uint64
valuesLen := len(values)
midpoints := make([]float64, 0, valuesLen)
counts := make([]uint64, 0, valuesLen)
for i, b := range values {
count := b.GetCumulativeCount()
le := b.GetUpperBound()
if i == 0 {
if le > 0 {
le /= 2
}
} else {
// apm-server expects non-cumulative
// counts. prometheus counts each
// bucket cumulatively, ie. bucketN
// contains all counts for bucketN and
// all counts in preceding values. To
// get the current bucket's count we
// subtract bucketN-1 from bucketN,
// when N>0.
count = count - values[i-1].GetCumulativeCount()
le = values[i-1].GetUpperBound() + (le-values[i-1].GetUpperBound())/2.0
}
// we are excluding zero-count
// prometheus buckets.
// the cumulative count may have
// initially been non-zero, but when we
// subtract the preceding bucket, it
// may end up having a zero count.
if count == 0 {
continue
}
bucketCount += count
counts = append(counts, count)
midpoints = append(midpoints, le)
}
// Check if there were observations that fell
// outside of the defined histogram buckets, so
// we need to modify the current final bucket,
// and add an additional bucket with these
// observations.
if infBucketCount := totalCount - bucketCount; infBucketCount > 0 && valuesLen > 0 {
// Set the midpoint for the +Inf bucket
// to be the final defined bucket value.
midpoints = append(midpoints, values[valuesLen-1].GetUpperBound())
counts = append(counts, infBucketCount)
}
out.AddHistogram(name, labels, midpoints, counts)
}
default:
}
}
return nil
}
func makeLabels(lps []*dto.LabelPair) []apm.MetricLabel {
labels := make([]apm.MetricLabel, len(lps))
for i, lp := range lps {
labels[i] = apm.MetricLabel{Name: lp.GetName(), Value: lp.GetValue()}
}
return labels
}