internal/beater/otlp/http.go (163 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package otlp import ( "context" "fmt" "io" "net/http" "sync" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "github.com/elastic/apm-data/input" "github.com/elastic/apm-data/input/otlp" "github.com/elastic/apm-data/model/modelpb" ) var ( httpMetricRegistrationMu sync.Mutex unsupportedHTTPMetricRegistration metric.Registration ) func NewHTTPHandlers(logger *zap.Logger, processor modelpb.BatchProcessor, semaphore input.Semaphore, mp metric.MeterProvider) HTTPHandlers { // TODO(axw) stop assuming we have only one OTLP HTTP consumer running // at any time, and instead aggregate metrics from consumers that are // dynamically registered and unregistered. consumer := otlp.NewConsumer(otlp.ConsumerConfig{ Processor: processor, Logger: logger, Semaphore: semaphore, RemapOTelMetrics: true, }) meter := mp.Meter("github.com/elastic/apm-server/internal/beater/otlp") httpMetricsConsumerUnsupportedDropped, _ := meter.Int64ObservableCounter( "apm-server.otlp.http.metrics.consumer.unsupported_dropped", ) httpMetricRegistrationMu.Lock() defer httpMetricRegistrationMu.Unlock() // TODO we should add an otel counter metric directly in the // apm-data consumer, then we could get rid of the callback. if unsupportedHTTPMetricRegistration != nil { _ = unsupportedHTTPMetricRegistration.Unregister() } unsupportedHTTPMetricRegistration, _ = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { stats := consumer.Stats() if stats.UnsupportedMetricsDropped > 0 { o.ObserveInt64(httpMetricsConsumerUnsupportedDropped, stats.UnsupportedMetricsDropped) } return nil }, httpMetricsConsumerUnsupportedDropped) return HTTPHandlers{consumer: consumer} } // HTTPHandlers encapsulates http.HandlerFuncs for handling traces, metrics, and logs. type HTTPHandlers struct { consumer *otlp.Consumer } // HandleTraces is an http.HandlerFunc that receives a protobuf-encoded traces export // request, and processes it with the handler's OTLP consumer. func (h HTTPHandlers) HandleTraces(w http.ResponseWriter, r *http.Request) { req := ptraceotlp.NewExportRequest() if err := h.readRequest(r, req); err != nil { h.writeError(w, err, http.StatusBadRequest) return } var result otlp.ConsumeTracesResult var err error if result, err = h.consumer.ConsumeTracesWithResult(r.Context(), req.Traces()); err != nil { h.writeError(w, err, http.StatusInternalServerError) return } resp := ptraceotlp.NewExportResponse() if result.RejectedSpans > 0 { resp.PartialSuccess().SetRejectedSpans(result.RejectedSpans) resp.PartialSuccess().SetErrorMessage(result.ErrorMessage) } if err := h.writeResponse(w, resp); err != nil { h.writeError(w, err, http.StatusInternalServerError) return } } // HandleMetrics is an http.HandlerFunc that receives a protobuf-encoded metrics export // request, and processes it with the handler's OTLP consumer. func (h HTTPHandlers) HandleMetrics(w http.ResponseWriter, r *http.Request) { req := pmetricotlp.NewExportRequest() if err := h.readRequest(r, req); err != nil { h.writeError(w, err, http.StatusBadRequest) return } var result otlp.ConsumeMetricsResult var err error if result, err = h.consumer.ConsumeMetricsWithResult(r.Context(), req.Metrics()); err != nil { h.writeError(w, err, http.StatusInternalServerError) return } resp := pmetricotlp.NewExportResponse() if result.RejectedDataPoints > 0 { resp.PartialSuccess().SetRejectedDataPoints(result.RejectedDataPoints) resp.PartialSuccess().SetErrorMessage(result.ErrorMessage) } if err := h.writeResponse(w, resp); err != nil { h.writeError(w, err, http.StatusInternalServerError) return } } // HandleLogs is an http.HandlerFunc that receives a protobuf-encoded logs export // request, and processes it with the handler's OTLP consumer. func (h HTTPHandlers) HandleLogs(w http.ResponseWriter, r *http.Request) { req := plogotlp.NewExportRequest() if err := h.readRequest(r, req); err != nil { h.writeError(w, err, http.StatusBadRequest) return } var result otlp.ConsumeLogsResult var err error if result, err = h.consumer.ConsumeLogsWithResult(r.Context(), req.Logs()); err != nil { h.writeError(w, err, http.StatusInternalServerError) return } resp := plogotlp.NewExportResponse() if result.RejectedLogRecords > 0 { resp.PartialSuccess().SetRejectedLogRecords(result.RejectedLogRecords) resp.PartialSuccess().SetErrorMessage(result.ErrorMessage) } if err := h.writeResponse(w, resp); err != nil { h.writeError(w, err, http.StatusInternalServerError) return } } type protoUnmarshaler interface { UnmarshalProto([]byte) error } type protoMarshaler interface { MarshalProto() ([]byte, error) } func (h HTTPHandlers) readRequest(req *http.Request, out protoUnmarshaler) error { body, err := io.ReadAll(req.Body) if err != nil { return fmt.Errorf("failed to read request body: %w", err) } if err := out.UnmarshalProto(body); err != nil { return fmt.Errorf("failed to unmarshal request body: %w", err) } return nil } func (h HTTPHandlers) writeResponse(w http.ResponseWriter, m protoMarshaler) error { body, err := m.MarshalProto() if err != nil { return fmt.Errorf("failed to marshal response: %w", err) } w.Header().Set("Content-Type", "application/x-protobuf") w.WriteHeader(http.StatusOK) w.Write(body) return nil } func (h HTTPHandlers) writeError(w http.ResponseWriter, err error, statusCode int) { s, ok := status.FromError(err) if !ok { if statusCode == http.StatusBadRequest { s = status.New(codes.InvalidArgument, err.Error()) } else { s = status.New(codes.Unknown, err.Error()) } } msg, err := proto.Marshal(s.Proto()) if err != nil { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(`{"code": 13, "message": "failed to marshal error message"}`)) return } w.Header().Set("Content-Type", "application/x-protobuf") w.WriteHeader(statusCode) w.Write(msg) }