internal/beater/interceptors/metrics.go (97 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package interceptors import ( "context" "sync" "time" "go.opentelemetry.io/otel/metric" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/elastic/apm-server/internal/beater/request" "github.com/elastic/elastic-agent-libs/logp" ) const ( requestDurationHistogram = "request.duration" ) type metricsInterceptor struct { logger *logp.Logger meter metric.Meter counters sync.Map histograms sync.Map } func (m *metricsInterceptor) Interceptor() grpc.UnaryServerInterceptor { return func( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error) { var legacyMetricsPrefix string switch info.FullMethod { case "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export": legacyMetricsPrefix = "apm-server.otlp.grpc.metrics." case "/opentelemetry.proto.collector.trace.v1.TraceService/Export": legacyMetricsPrefix = "apm-server.otlp.grpc.traces." case "/opentelemetry.proto.collector.logs.v1.LogsService/Export": legacyMetricsPrefix = "apm-server.otlp.grpc.logs." default: m.logger.With( "grpc.request.method", info.FullMethod, ).Warn("metrics registry missing") return handler(ctx, req) } m.inc(legacyMetricsPrefix, request.IDRequestCount) defer m.inc(legacyMetricsPrefix, request.IDResponseCount) start := time.Now() resp, err := handler(ctx, req) duration := time.Since(start) m.getHistogram(requestDurationHistogram, metric.WithUnit("ms")).Record(context.Background(), duration.Milliseconds()) responseID := request.IDResponseValidCount if err != nil { responseID = request.IDResponseErrorsCount if s, ok := status.FromError(err); ok { switch s.Code() { case codes.Unauthenticated: m.inc(legacyMetricsPrefix, request.IDResponseErrorsUnauthorized) case codes.DeadlineExceeded, codes.Canceled: m.inc(legacyMetricsPrefix, request.IDResponseErrorsTimeout) case codes.ResourceExhausted: m.inc(legacyMetricsPrefix, request.IDResponseErrorsRateLimit) } } } m.inc(legacyMetricsPrefix, responseID) return resp, err } } func (m *metricsInterceptor) inc(legacyMetricsPrefix string, id request.ResultID) { m.getCounter("grpc.server.", string(id)).Add(context.Background(), 1) m.getCounter(legacyMetricsPrefix, string(id)).Add(context.Background(), 1) } func (m *metricsInterceptor) getCounter(prefix, n string) metric.Int64Counter { name := prefix + n if met, ok := m.counters.Load(name); ok { return met.(metric.Int64Counter) } nm, _ := m.meter.Int64Counter(name) met, _ := m.counters.LoadOrStore(name, nm) return met.(metric.Int64Counter) } func (m *metricsInterceptor) getHistogram(n string, opts ...metric.Int64HistogramOption) metric.Int64Histogram { name := "grpc.server." + n if met, ok := m.histograms.Load(name); ok { return met.(metric.Int64Histogram) } nm, _ := m.meter.Int64Histogram(name, opts...) met, _ := m.histograms.LoadOrStore(name, nm) return met.(metric.Int64Histogram) } // Metrics returns a grpc.UnaryServerInterceptor that increments metrics // for gRPC method calls. // // If a gRPC service implements UnaryRequestMetrics, its RequestMetrics // method will be called to obtain the metrics map for incrementing. If the // service does not implement UnaryRequestMetrics, but // RegisterMethodUnaryRequestMetrics has been called for the invoked method, // then the registered UnaryRequestMetrics will be used instead. Finally, // if neither of these are available, a warning will be logged and no metrics // will be gathered. func Metrics(logger *logp.Logger, mp metric.MeterProvider) grpc.UnaryServerInterceptor { i := &metricsInterceptor{ logger: logger, meter: mp.Meter("github.com/elastic/apm-server/internal/beater/interceptors"), counters: sync.Map{}, histograms: sync.Map{}, } return i.Interceptor() }