internal/beater/otlp/grpc.go (102 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package otlp import ( "context" "sync" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "google.golang.org/grpc" "github.com/elastic/apm-data/input" "github.com/elastic/apm-data/input/otlp" "github.com/elastic/apm-data/model/modelpb" ) var ( grpcMetricRegistrationMu sync.Mutex unsupportedGRPCMetricRegistration metric.Registration ) // RegisterGRPCServices registers OTLP consumer services with the given gRPC server. func RegisterGRPCServices( grpcServer *grpc.Server, logger *zap.Logger, processor modelpb.BatchProcessor, semaphore input.Semaphore, mp metric.MeterProvider, ) { // TODO(axw) stop assuming we have only one OTLP gRPC service running // at any time, and instead aggregate metrics from consumers that are // dynamically registered and unregistered. consumer := otlp.NewConsumer(otlp.ConsumerConfig{ Processor: processor, Logger: logger, Semaphore: semaphore, RemapOTelMetrics: true, }) meter := mp.Meter("github.com/elastic/apm-server/internal/beater/otlp") grpcMetricsConsumerUnsupportedDropped, _ := meter.Int64ObservableCounter( "apm-server.otlp.grpc.metrics.consumer.unsupported_dropped", ) grpcMetricRegistrationMu.Lock() defer grpcMetricRegistrationMu.Unlock() // TODO we should add an otel counter metric directly in the // apm-data consumer, then we could get rid of the callback. if unsupportedGRPCMetricRegistration != nil { _ = unsupportedGRPCMetricRegistration.Unregister() } unsupportedGRPCMetricRegistration, _ = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { stats := consumer.Stats() if stats.UnsupportedMetricsDropped > 0 { o.ObserveInt64(grpcMetricsConsumerUnsupportedDropped, stats.UnsupportedMetricsDropped) } return nil }, grpcMetricsConsumerUnsupportedDropped) ptraceotlp.RegisterGRPCServer(grpcServer, &tracesService{consumer: consumer}) pmetricotlp.RegisterGRPCServer(grpcServer, &metricsService{consumer: consumer}) plogotlp.RegisterGRPCServer(grpcServer, &logsService{consumer: consumer}) } type tracesService struct { ptraceotlp.UnimplementedGRPCServer consumer *otlp.Consumer } func (s *tracesService) Export(ctx context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { td := req.Traces() if td.SpanCount() == 0 { return ptraceotlp.NewExportResponse(), nil } resp := ptraceotlp.NewExportResponse() result, err := s.consumer.ConsumeTracesWithResult(ctx, td) if err == nil && result.RejectedSpans > 0 { resp.PartialSuccess().SetRejectedSpans(result.RejectedSpans) resp.PartialSuccess().SetErrorMessage(result.ErrorMessage) } return resp, err } type metricsService struct { pmetricotlp.UnimplementedGRPCServer consumer *otlp.Consumer } func (s *metricsService) Export(ctx context.Context, req pmetricotlp.ExportRequest) (pmetricotlp.ExportResponse, error) { md := req.Metrics() if md.DataPointCount() == 0 { return pmetricotlp.NewExportResponse(), nil } resp := pmetricotlp.NewExportResponse() result, err := s.consumer.ConsumeMetricsWithResult(ctx, md) if err == nil && result.RejectedDataPoints > 0 { resp.PartialSuccess().SetRejectedDataPoints(result.RejectedDataPoints) resp.PartialSuccess().SetErrorMessage(result.ErrorMessage) } return resp, err } type logsService struct { plogotlp.UnimplementedGRPCServer consumer *otlp.Consumer } func (s *logsService) Export(ctx context.Context, req plogotlp.ExportRequest) (plogotlp.ExportResponse, error) { ld := req.Logs() if ld.LogRecordCount() == 0 { return plogotlp.NewExportResponse(), nil } resp := plogotlp.NewExportResponse() result, err := s.consumer.ConsumeLogsWithResult(ctx, ld) if err == nil && result.RejectedLogRecords > 0 { resp.PartialSuccess().SetRejectedLogRecords(result.RejectedLogRecords) resp.PartialSuccess().SetErrorMessage(result.ErrorMessage) } return resp, err }