internal/telemetry/metric_exporter.go (270 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package telemetry
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"github.com/elastic/apm-data/model/modelpb"
)
// NewMetricExporter initializes a new MetricExporter
// This export logic is heavily inspired from the OTLP input in apm-data.
// https://github.com/elastic/apm-data/blob/main/input/otlp/metrics.go
func NewMetricExporter(opts ...ConfigOption) *MetricExporter {
cfg := newConfig(opts...)
return &MetricExporter{
processor: cfg.processor,
metricFilter: cfg.MetricFilter,
temporalitySelector: cfg.TemporalitySelector,
aggregationSelector: cfg.AggregationSelector,
}
}
// MetricExporter is an OpenTelemetry metric Reader which retrieves metrics,
// filters them and emits them to the specified consumer
type MetricExporter struct {
processor modelpb.BatchProcessor
metricFilter []string
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}
// SetBatchProcessor sets a batch processor on the exporter
func (e *MetricExporter) SetBatchProcessor(p modelpb.BatchProcessor) {
e.processor = p
}
// Temporality returns the Temporality to use for an instrument kind.
func (e *MetricExporter) Temporality(k metric.InstrumentKind) metricdata.Temporality {
return e.temporalitySelector(k)
}
// Aggregation returns the Aggregation to use for an instrument kind.
func (e *MetricExporter) Aggregation(k metric.InstrumentKind) metric.Aggregation {
return e.aggregationSelector(k)
}
func (e *MetricExporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if e.processor == nil {
return nil
}
batch := modelpb.Batch{}
now := time.Now()
baseEvent := modelpb.APMEvent{
Service: &modelpb.Service{
Name: "apm-server",
Language: &modelpb.Language{Name: "go"},
},
Event: &modelpb.Event{
Received: modelpb.FromTime(now),
},
}
for _, scopeMetrics := range rm.ScopeMetrics {
ms := make(metricsets)
for _, sm := range scopeMetrics.Metrics {
if e.isMetricFiltered(sm.Name) {
continue
}
if err := addMetric(sm, ms); err != nil {
return err
}
}
for key, ms := range ms {
event := baseEvent.CloneVT()
event.Timestamp = modelpb.FromTime(key.timestamp)
metrs := make([]*modelpb.MetricsetSample, 0, len(ms.samples))
for _, s := range ms.samples {
metrs = append(metrs, s)
}
event.Metricset = &modelpb.Metricset{Samples: metrs, Name: "app"}
if ms.attributes.Len() > 0 {
event.Labels = modelpb.Labels{}
event.NumericLabels = modelpb.NumericLabels{}
iter := ms.attributes.Iter()
for iter.Next() {
_, kv := iter.IndexedAttribute()
setLabel(string(kv.Key), event, kv.Value.Emit())
}
if len(event.Labels) == 0 {
event.Labels = nil
}
if len(event.NumericLabels) == 0 {
event.NumericLabels = nil
}
}
batch = append(batch, event)
}
}
if len(batch) == 0 {
return nil
}
return e.processor.ProcessBatch(ctx, &batch)
}
func (e *MetricExporter) isMetricFiltered(n string) bool {
for _, v := range e.metricFilter {
if v == n {
return false
}
}
return true
}
func addMetric(sm metricdata.Metrics, ms metricsets) error {
switch m := sm.Data.(type) {
case metricdata.Histogram[int64]:
for _, dp := range m.DataPoints {
if hist := buildHistogram(dp); hist != nil {
sample := modelpb.MetricsetSample{
Name: sm.Name,
Type: modelpb.MetricType_METRIC_TYPE_HISTOGRAM,
Histogram: hist,
}
ms.upsert(dp.Time, dp.Attributes, &sample)
}
}
case metricdata.Histogram[float64]:
for _, dp := range m.DataPoints {
if hist := buildHistogram(dp); hist != nil {
sample := modelpb.MetricsetSample{
Name: sm.Name,
Type: modelpb.MetricType_METRIC_TYPE_HISTOGRAM,
Histogram: hist,
}
ms.upsert(dp.Time, dp.Attributes, &sample)
}
}
case metricdata.Sum[int64]:
for _, dp := range m.DataPoints {
sample := modelpb.MetricsetSample{
Name: sm.Name,
Type: modelpb.MetricType_METRIC_TYPE_COUNTER,
Value: float64(dp.Value),
}
ms.upsert(dp.Time, dp.Attributes, &sample)
}
case metricdata.Sum[float64]:
for _, dp := range m.DataPoints {
sample := modelpb.MetricsetSample{
Name: sm.Name,
Type: modelpb.MetricType_METRIC_TYPE_COUNTER,
Value: dp.Value,
}
ms.upsert(dp.Time, dp.Attributes, &sample)
}
case metricdata.Gauge[int64]:
for _, dp := range m.DataPoints {
sample := modelpb.MetricsetSample{
Name: sm.Name,
Type: modelpb.MetricType_METRIC_TYPE_GAUGE,
Value: float64(dp.Value),
}
ms.upsert(dp.Time, dp.Attributes, &sample)
}
case metricdata.Gauge[float64]:
for _, dp := range m.DataPoints {
sample := modelpb.MetricsetSample{
Name: sm.Name,
Type: modelpb.MetricType_METRIC_TYPE_GAUGE,
Value: dp.Value,
}
ms.upsert(dp.Time, dp.Attributes, &sample)
}
default:
return fmt.Errorf("unknown metric type %q", m)
}
return nil
}
func buildHistogram[T int64 | float64](dp metricdata.HistogramDataPoint[T]) *modelpb.Histogram {
if len(dp.BucketCounts) != len(dp.Bounds)+1 || len(dp.Bounds) == 0 {
return nil
}
bounds := make([]float64, 0, len(dp.Bounds))
counts := make([]uint64, 0, len(dp.BucketCounts))
for i := range dp.BucketCounts {
bound, count := computeCountAndBounds(i, dp.Bounds, dp.BucketCounts)
if count == 0 {
continue
}
counts = append(counts, count)
bounds = append(bounds, bound)
}
return &modelpb.Histogram{
Counts: counts,
Values: bounds,
}
}
func computeCountAndBounds(i int, bounds []float64, counts []uint64) (float64, uint64) {
count := counts[i]
if count == 0 {
return 0, 0
}
var bound float64
switch i {
// (-infinity, explicit_bounds[i]]
case 0:
bound = bounds[i]
if bound > 0 {
bound /= 2
}
// (explicit_bounds[i], +infinity)
case len(counts) - 1:
bound = bounds[i-1]
// [explicit_bounds[i-1], explicit_bounds[i])
default:
// Use the midpoint between the boundaries.
bound = bounds[i-1] + (bounds[i]-bounds[i-1])/2.0
}
return bound, count
}
func (e *MetricExporter) ForceFlush(ctx context.Context) error {
return ctx.Err()
}
func (e *MetricExporter) Shutdown(ctx context.Context) error {
return nil
}
func setLabel(key string, event *modelpb.APMEvent, v interface{}) {
switch v := v.(type) {
case string:
modelpb.Labels(event.Labels).Set(key, v)
case bool:
modelpb.Labels(event.Labels).Set(key, strconv.FormatBool(v))
case float64:
modelpb.NumericLabels(event.NumericLabels).Set(key, v)
case int64:
modelpb.NumericLabels(event.NumericLabels).Set(key, float64(v))
case []interface{}:
if len(v) == 0 {
return
}
switch v[0].(type) {
case string:
value := make([]string, len(v))
for i := range v {
value[i] = v[i].(string)
}
modelpb.Labels(event.Labels).SetSlice(key, value)
case float64:
value := make([]float64, len(v))
for i := range v {
value[i] = v[i].(float64)
}
modelpb.NumericLabels(event.NumericLabels).SetSlice(key, value)
}
}
}
type metricsets map[metricsetKey]metricset
type metricsetKey struct {
timestamp time.Time
signature string // combination of all attributes
}
type metricset struct {
attributes attribute.Set
samples map[string]*modelpb.MetricsetSample
}
// upsert searches for an existing metricset with the given timestamp and labels,
// and appends the sample to it. If there is no such existing metricset, a new one
// is created.
func (ms metricsets) upsert(timestamp time.Time, attributes attribute.Set, sample *modelpb.MetricsetSample) {
// We always record metrics as they are given. We also copy some
// well-known OpenTelemetry metrics to their Elastic APM equivalents.
ms.upsertOne(timestamp, attributes, sample)
}
func (ms metricsets) upsertOne(timestamp time.Time, attributes attribute.Set, sample *modelpb.MetricsetSample) {
var signatureBuilder strings.Builder
iter := attributes.Iter()
for iter.Next() {
_, kv := iter.IndexedAttribute()
signatureBuilder.WriteString(string(kv.Key))
signatureBuilder.WriteString(kv.Value.Emit())
}
key := metricsetKey{timestamp: timestamp, signature: signatureBuilder.String()}
m, ok := ms[key]
if !ok {
m = metricset{
attributes: attributes,
samples: make(map[string]*modelpb.MetricsetSample),
}
ms[key] = m
}
m.samples[sample.Name] = sample
}