metricbeat/helper/prometheus/metric.go (304 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package prometheus
import (
"fmt"
"math"
"strconv"
"strings"
"time"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/elastic-agent-libs/mapstr"
)
// MetricMap defines the mapping from Prometheus metric to a Metricbeat field
type MetricMap interface {
// GetOptions returns the list of metric options
GetOptions() []MetricOption
// GetField returns the resulting field name
GetField() string
// GetValue returns the resulting value
GetValue(m *OpenMetric) interface{}
// GetConfiguration returns the configuration for the metric
GetConfiguration() Configuration
}
// Configuration for mappings that needs extended treatment
type Configuration struct {
// StoreNonMappedLables indicates if labels found at the metric that are
// not found at the label map should be part of the resulting event.
// This setting should be used when the label name is not known beforehand
StoreNonMappedLabels bool
// NonMappedLabelsPlacement is used when StoreNonMappedLabels is set to true, and
// defines the key path at the event under which to store the dynamically found labels.
// This key path will be added to the events that match this metric along with a subset of
// key/value pairs will be created under it, one for each non mapped label found.
//
// Example:
//
// given a metric family in a prometheus resource in the form:
// metric1{label1="value1",label2="value2"} 1
// and not mapping labels but using this entry on a the MetriMap definition:
// "metric1": ExtendedInfoMetric(Configuration{StoreNonMappedLabels: true, NonMappedLabelsPlacement: "mypath"}),
// would output an event that contains a metricset field as follows
// "mypath": {"label1":"value1","label2":"value2"}
//
NonMappedLabelsPlacement string
// MetricProcessing options are a set of functions that will be
// applied to metrics after they are retrieved
MetricProcessingOptions []MetricOption
// ExtraFields is used to add fields to the
// event where this metric is included
ExtraFields mapstr.M
}
// MetricOption adds settings to Metric objects behavior
type MetricOption interface {
// Process a tuple of field, value and labels from a metric, return the same tuple updated
Process(field string, value interface{}, labels mapstr.M) (string, interface{}, mapstr.M)
}
// OpFilterMap only processes metrics matching the given filter
func OpFilterMap(label string, filterMap map[string]string) MetricOption {
return opFilterMap{
label: label,
filterMap: filterMap,
}
}
// OpLowercaseValue lowercases the value if it's a string
func OpLowercaseValue() MetricOption {
return opLowercaseValue{}
}
// OpUnixTimestampValue parses a value into a Unix timestamp
func OpUnixTimestampValue() MetricOption {
return opUnixTimestampValue{}
}
// OpMultiplyBuckets multiplies bucket labels in histograms, useful to change units
func OpMultiplyBuckets(multiplier float64) MetricOption {
return opMultiplyBuckets{
multiplier: multiplier,
}
}
// OpSetSuffix extends the field's name with the given suffix if the value of the metric
// is numeric (and not histogram or quantile), otherwise does nothing
func OpSetNumericMetricSuffix(suffix string) MetricOption {
return opSetNumericMetricSuffix{
suffix: suffix,
}
}
// Metric directly maps a Prometheus metric to a Metricbeat field
func Metric(field string, options ...MetricOption) MetricMap {
return &commonMetric{
field: field,
config: Configuration{MetricProcessingOptions: options},
}
}
// KeywordMetric maps a Prometheus metric to a Metricbeat field, stores the
// given keyword when source metric value is 1
func KeywordMetric(field, keyword string, options ...MetricOption) MetricMap {
return &keywordMetric{
commonMetric{
field: field,
config: Configuration{MetricProcessingOptions: options},
},
keyword,
}
}
// BooleanMetric maps a Prometheus metric to a Metricbeat field of bool type
func BooleanMetric(field string, options ...MetricOption) MetricMap {
return &booleanMetric{
commonMetric{
field: field,
config: Configuration{MetricProcessingOptions: options},
},
}
}
// LabelMetric maps a Prometheus metric to a Metricbeat field, stores the value
// of a given label on it if the gauge value is 1
func LabelMetric(field, label string, options ...MetricOption) MetricMap {
return &labelMetric{
commonMetric{
field: field,
config: Configuration{MetricProcessingOptions: options},
},
label,
}
}
// InfoMetric obtains info labels from the given metric and puts them
// into events matching all the key labels present in the metric
func InfoMetric(options ...MetricOption) MetricMap {
return &infoMetric{
commonMetric{
config: Configuration{MetricProcessingOptions: options},
},
}
}
// ExtendedInfoMetric obtains info labels from the given metric and puts them
// into events matching all the key labels present in the metric
func ExtendedInfoMetric(configuration Configuration) MetricMap {
return &infoMetric{
commonMetric{
config: configuration,
},
}
}
// ExtendedMetric is a metric item that allows extended behaviour
// through configuration
func ExtendedMetric(field string, configuration Configuration) MetricMap {
return &commonMetric{
field: field,
config: configuration,
}
}
type commonMetric struct {
field string
config Configuration
}
// GetOptions returns the list of metric options
func (m *commonMetric) GetOptions() []MetricOption {
return m.config.MetricProcessingOptions
}
// GetField returns the resulting field name
func (m *commonMetric) GetField() string {
return m.field
}
// GetConfiguration returns the configuration for the metric
func (m *commonMetric) GetConfiguration() Configuration {
return m.config
}
// GetValue returns the resulting value
func (m *commonMetric) GetValue(metric *OpenMetric) interface{} {
counter := metric.GetCounter()
if counter != nil {
if !math.IsNaN(counter.GetValue()) && !math.IsInf(counter.GetValue(), 0) {
return int64(counter.GetValue())
}
}
gauge := metric.GetGauge()
if gauge != nil {
if !math.IsNaN(gauge.GetValue()) && !math.IsInf(gauge.GetValue(), 0) {
return gauge.GetValue()
}
}
summary := metric.GetSummary()
if summary != nil {
value := mapstr.M{}
if !math.IsNaN(summary.GetSampleSum()) && !math.IsInf(summary.GetSampleSum(), 0) {
value["sum"] = summary.GetSampleSum()
value["count"] = summary.GetSampleCount()
}
quantiles := summary.GetQuantile()
percentileMap := mapstr.M{}
for _, quantile := range quantiles {
if !math.IsNaN(quantile.GetValue()) && !math.IsInf(quantile.GetValue(), 0) {
key := strconv.FormatFloat(100*quantile.GetQuantile(), 'f', -1, 64)
percentileMap[key] = quantile.GetValue()
}
}
if len(percentileMap) != 0 {
value["percentile"] = percentileMap
}
return value
}
histogram := metric.GetHistogram()
if histogram != nil {
value := mapstr.M{}
if !math.IsNaN(histogram.GetSampleSum()) && !math.IsInf(histogram.GetSampleSum(), 0) {
value["sum"] = histogram.GetSampleSum()
value["count"] = histogram.GetSampleCount()
}
buckets := histogram.GetBucket()
bucketMap := mapstr.M{}
for _, bucket := range buckets {
if bucket.GetCumulativeCount() != uint64(math.NaN()) && bucket.GetCumulativeCount() != uint64(math.Inf(0)) {
key := strconv.FormatFloat(bucket.GetUpperBound(), 'f', -1, 64)
bucketMap[key] = bucket.GetCumulativeCount()
}
}
if len(bucketMap) != 0 {
value["bucket"] = bucketMap
}
return value
}
// Other types are not supported here
return nil
}
type keywordMetric struct {
commonMetric
keyword string
}
// GetValue returns the resulting value
func (m *keywordMetric) GetValue(metric *OpenMetric) interface{} {
if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 {
return m.keyword
}
return nil
}
type booleanMetric struct {
commonMetric
}
// GetValue returns the resulting value
func (m *booleanMetric) GetValue(metric *OpenMetric) interface{} {
if gauge := metric.GetGauge(); gauge != nil {
return gauge.GetValue() == 1
}
return nil
}
type labelMetric struct {
commonMetric
label string
}
// GetValue returns the resulting value
func (m *labelMetric) GetValue(metric *OpenMetric) interface{} {
if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 {
return getLabel(metric, m.label)
}
return nil
}
func getLabel(metric *OpenMetric, name string) string {
for _, label := range metric.GetLabel() {
if label.Name == name {
return label.Value
}
}
return ""
}
type infoMetric struct {
commonMetric
}
// GetValue returns the resulting value
func (m *infoMetric) GetValue(metric *OpenMetric) interface{} {
return ""
}
// GetField returns the resulting field name
func (m *infoMetric) GetField() string {
return ""
}
type opFilterMap struct {
label string
filterMap map[string]string
}
// Called by the Prometheus helper to apply extra options on retrieved metrics
// Check whether the value of the specified label is allowed and, if yes, return the metric via the specified mapped field
// Else, if the specified label does not match the filter, return nil
// This is useful in cases where multiple Metricbeat fields need to be defined per Prometheus metric, based on label values
func (o opFilterMap) Process(field string, value interface{}, labels mapstr.M) (string, interface{}, mapstr.M) {
for k, v := range o.filterMap {
if labels[o.label] == k {
if field == "" {
return v, value, labels
} else {
return fmt.Sprintf("%v.%v", field, v), value, labels
}
}
}
return "", nil, nil
}
type opLowercaseValue struct{}
// Process will lowercase the given value if it's a string
func (o opLowercaseValue) Process(field string, value interface{}, labels mapstr.M) (string, interface{}, mapstr.M) {
if val, ok := value.(string); ok {
value = strings.ToLower(val)
}
return field, value, labels
}
type opMultiplyBuckets struct {
multiplier float64
}
// Process will multiply the bucket labels if it is an histogram with numeric labels
func (o opMultiplyBuckets) Process(field string, value interface{}, labels mapstr.M) (string, interface{}, mapstr.M) {
histogram, ok := value.(mapstr.M)
if !ok {
return field, value, labels
}
bucket, ok := histogram["bucket"].(mapstr.M)
if !ok {
return field, value, labels
}
sum, ok := histogram["sum"].(float64)
if !ok {
return field, value, labels
}
multiplied := mapstr.M{}
for k, v := range bucket {
if f, err := strconv.ParseFloat(k, 64); err == nil {
key := strconv.FormatFloat(f*o.multiplier, 'f', -1, 64)
multiplied[key] = v
} else {
multiplied[k] = v
}
}
histogram["bucket"] = multiplied
histogram["sum"] = sum * o.multiplier
return field, histogram, labels
}
type opSetNumericMetricSuffix struct {
suffix string
}
// Process will extend the field's name with the given suffix
func (o opSetNumericMetricSuffix) Process(field string, value interface{}, labels mapstr.M) (string, interface{}, mapstr.M) {
_, ok := value.(float64)
if !ok {
return field, value, labels
}
field = fmt.Sprintf("%v.%v", field, o.suffix)
return field, value, labels
}
type opUnixTimestampValue struct {
}
// Process converts a value in seconds into an unix time
func (o opUnixTimestampValue) Process(field string, value interface{}, labels mapstr.M) (string, interface{}, mapstr.M) {
return field, common.Time(time.Unix(int64(value.(float64)), 0)), labels
}
// OpLabelKeyPrefixRemover removes prefix from label keys
func OpLabelKeyPrefixRemover(prefix string) MetricOption {
return opLabelKeyPrefixRemover{prefix}
}
// opLabelKeyPrefixRemover is a metric option processor that removes a prefix from the key of a label set
type opLabelKeyPrefixRemover struct {
Prefix string
}
// Process modifies the labels map, removing a prefix when found at keys of the labels set.
// For each label, if the key is found a new key will be created hosting the same value and the
// old key will be deleted.
// Fields, values and not prefixed labels will remain unmodified.
func (o opLabelKeyPrefixRemover) Process(field string, value interface{}, labels mapstr.M) (string, interface{}, mapstr.M) {
renameKeys := []string{}
for k := range labels {
if len(k) < len(o.Prefix) {
continue
}
if k[:6] == o.Prefix {
renameKeys = append(renameKeys, k)
}
}
for i := range renameKeys {
v := labels[renameKeys[i]]
delete(labels, renameKeys[i])
labels[renameKeys[i][len(o.Prefix):]] = v
}
return "", value, labels
}