module/apmotel/gatherer.go (107 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apmotel // import "go.elastic.co/apm/module/apmotel/v2" import ( "context" "fmt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.elastic.co/apm/v2" ) // NewGatherer creates a new gatherer/exporter to bridge between agent metrics and OpenTelemetry func NewGatherer(opts ...GathererOption) (Gatherer, error) { cfg := newGathererConfig(opts...) reader := metric.NewManualReader(cfg.manualReaderOptions()...) return Gatherer{reader}, nil } // Gatherer is a gatherer/exporter which can act as an OpenTelemetry manual // reader, to retrieve metrics and expose them to the agent. type Gatherer struct { metric.Reader } var _ metric.Reader = Gatherer{} var _ apm.MetricsGatherer = Gatherer{} // GatherMetrics gathers metrics into out. func (e Gatherer) GatherMetrics(ctx context.Context, out *apm.Metrics) error { metrics := metricdata.ResourceMetrics{} err := e.Reader.Collect(ctx, &metrics) if err != nil { return err } for _, scopeMetrics := range metrics.ScopeMetrics { for _, sm := range scopeMetrics.Metrics { switch m := sm.Data.(type) { case metricdata.Histogram[int64]: addHistogramMetric(out, sm, m) case metricdata.Histogram[float64]: addHistogramMetric(out, sm, m) case metricdata.Sum[int64]: for _, dp := range m.DataPoints { if m.Temporality == metricdata.DeltaTemporality && dp.Value == 0 { continue } out.Add(sm.Name, makeLabels(dp.Attributes), float64(dp.Value)) } case metricdata.Sum[float64]: for _, dp := range m.DataPoints { if m.Temporality == metricdata.DeltaTemporality && dp.Value == 0 { continue } out.Add(sm.Name, makeLabels(dp.Attributes), dp.Value) } case metricdata.Gauge[int64]: for _, dp := range m.DataPoints { out.Add(sm.Name, makeLabels(dp.Attributes), float64(dp.Value)) } case metricdata.Gauge[float64]: for _, dp := range m.DataPoints { out.Add(sm.Name, makeLabels(dp.Attributes), dp.Value) } default: return fmt.Errorf("unknown metric type %q", m) } } } return nil } func addHistogramMetric[T int64 | float64](out *apm.Metrics, sm metricdata.Metrics, m metricdata.Histogram[T]) { for _, dp := range m.DataPoints { if len(dp.BucketCounts) != len(dp.Bounds)+1 || len(dp.Bounds) == 0 { continue } bounds := make([]float64, 0, len(dp.Bounds)) counts := make([]uint64, 0, len(dp.BucketCounts)) for i, _ := range dp.BucketCounts { bound, count := computeCountAndBounds(i, dp.Bounds, dp.BucketCounts) if count == 0 { continue } counts = append(counts, count) bounds = append(bounds, bound) } out.AddHistogram(sm.Name, makeLabels(dp.Attributes), bounds, counts) } } func computeCountAndBounds(i int, bounds []float64, counts []uint64) (float64, uint64) { count := counts[i] if count == 0 { return 0, 0 } var bound float64 switch i { // (-infinity, explicit_bounds[i]] case 0: bound = bounds[i] if bound > 0 { bound /= 2 } // (explicit_bounds[i], +infinity) case len(counts) - 1: bound = bounds[i-1] // [explicit_bounds[i-1], explicit_bounds[i]) default: // Use the midpoint between the boundaries. bound = bounds[i-1] + (bounds[i]-bounds[i-1])/2.0 } return bound, count } func makeLabels(attrs attribute.Set) []apm.MetricLabel { labels := make([]apm.MetricLabel, attrs.Len()) iter := attrs.Iter() for iter.Next() { i, kv := iter.IndexedAttribute() labels[i] = apm.MetricLabel{Name: string(kv.Key), Value: kv.Value.Emit()} } return labels }