collector/otlp/metrics_transfer.go (161 lines of code) (raw):
package otlp
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"strconv"
v1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/collector/metrics/v1"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/logger"
connect "github.com/bufbuild/connect-go"
gbp "github.com/libp2p/go-buffer-pool"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/proto"
)
type MetricsService struct {
writer MetricWriter
logger *slog.Logger
httpReqReceived *prometheus.CounterVec
grpcReqReceived *prometheus.CounterVec
}
func NewMetricsService(writer MetricWriter, path string, grpcPort int) *MetricsService {
return &MetricsService{
writer: writer,
logger: slog.Default().With(
slog.Group(
"handler",
slog.String("protocol", "otlp-metrics"),
),
),
httpReqReceived: metrics.MetricsRequestsReceived.MustCurryWith(prometheus.Labels{"protocol": "oltp/http", "endpoint": path}),
grpcReqReceived: metrics.MetricsRequestsReceived.MustCurryWith(prometheus.Labels{"protocol": "oltp/grpc", "endpoint": fmt.Sprintf(":%d", grpcPort)}),
}
}
// Export implements metricsv1connect.MetricsServiceHandler to handle OLTP/GRPC metrics requests
func (s *MetricsService) Export(ctx context.Context, req *connect.Request[v1.ExportMetricsServiceRequest]) (*connect.Response[v1.ExportMetricsServiceResponse], error) {
resp, status, code := s.writeMessage(ctx, req.Msg)
if status != nil {
var c connect.Code = connect.CodeUnavailable
if code >= 400 && code < 500 {
c = connect.CodeInvalidArgument
}
err := connect.NewError(c, nil)
if detail, detailErr := connect.NewErrorDetail(status); detailErr == nil {
err.AddDetail(detail)
}
s.grpcReqReceived.WithLabelValues(c.String()).Inc()
return nil, err
}
s.grpcReqReceived.WithLabelValues("ok").Inc()
return connect.NewResponse(resp), nil
}
// Handler handles OTLP/HTTP metrics requests
// See https://opentelemetry.io/docs/specs/otlp/#otlphttp
func (s *MetricsService) Handler(w http.ResponseWriter, r *http.Request) {
m := s.httpReqReceived
defer r.Body.Close()
ctx := r.Context()
switch r.Header.Get("Content-Type") {
case "application/x-protobuf":
// Consume the request body and marshal into a protobuf
b := gbp.Get(int(r.ContentLength))
defer gbp.Put(b)
n, err := io.ReadFull(r.Body, b)
if err != nil {
s.logger.Error("Failed to read request body", "Error", err)
status := newErrorStatus("Failed to read request body.")
writeErrorStatusResponse(w, http.StatusInternalServerError, status, m)
return
}
if n < int(r.ContentLength) {
s.logger.Warn("Short read")
status := newErrorStatus("Body did not contain enough bytes.")
writeErrorStatusResponse(w, http.StatusBadRequest, status, m)
return
}
b = b[:n]
if logger.IsDebug() {
s.logger.Debug("Received request body", "Bytes", n)
}
msg := &v1.ExportMetricsServiceRequest{}
if err := proto.Unmarshal(b, msg); err != nil {
s.logger.Error("Failed to unmarshal request body", "Error", err)
status := newErrorStatus("Unable to unmarshal ExportMetricsServiceRequest.")
writeErrorStatusResponse(w, http.StatusBadRequest, status, m)
return
}
resp, status, code := s.writeMessage(ctx, msg)
if status != nil {
writeErrorStatusResponse(w, code, status, m)
} else {
writeExportMetricsServiceResponse(w, resp, m)
}
default:
logger.Errorf("Unsupported Content-Type: %s", r.Header.Get("Content-Type"))
w.WriteHeader(http.StatusUnsupportedMediaType)
m.WithLabelValues(strconv.Itoa(http.StatusUnsupportedMediaType)).Inc()
}
}
// writeMessage writes the metrics to the writer and returns the response, status message (for errors), and HTTP status code
func (s *MetricsService) writeMessage(ctx context.Context, msg *v1.ExportMetricsServiceRequest) (*v1.ExportMetricsServiceResponse, *status.Status, int) {
err := s.writer.Write(ctx, msg)
if err != nil {
if errors.Is(err, ErrUnknownMetricType) {
s.logger.Warn("Received unknown metric type", "Error", err)
status := newErrorStatus("Unknown metric type")
return nil, status, http.StatusBadRequest
}
// Rejected metrics are not an error - return OK with the count of rejected metrics and the error.
var rejectedMetricsErr *ErrRejectedMetric
if errors.As(err, &rejectedMetricsErr) {
s.logger.Warn("Rejecting some metrics", "Error", err)
resp := &v1.ExportMetricsServiceResponse{
PartialSuccess: &v1.ExportMetricsPartialSuccess{
RejectedDataPoints: rejectedMetricsErr.Count,
ErrorMessage: rejectedMetricsErr.Msg,
},
}
return resp, nil, http.StatusOK
}
var writeErr *ErrWriteError
if errors.As(err, &writeErr) {
s.logger.Error("Failed to write metrics", "Error", err)
status := newErrorStatus("Failed to write metrics. Please try again.")
return nil, status, http.StatusInternalServerError
} else {
// Unknown error
s.logger.Error("Failed to forward metrics with unknown error", "Error", err)
status := newErrorStatus("Internal Server Error. Please try again.")
return nil, status, http.StatusInternalServerError
}
}
return &v1.ExportMetricsServiceResponse{}, nil, http.StatusOK
}
func writeExportMetricsServiceResponse(w http.ResponseWriter, resp *v1.ExportMetricsServiceResponse, m *prometheus.CounterVec) {
w.Header().Add("Content-Type", "application/x-protobuf")
respBodyBytes, err := proto.Marshal(resp)
if err != nil {
logger.Error("Failed to marshal response", "Error", err)
w.WriteHeader(http.StatusInternalServerError)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
return
}
m.WithLabelValues(strconv.Itoa(http.StatusOK)).Inc()
w.WriteHeader(http.StatusOK)
w.Write(respBodyBytes)
}
func writeErrorStatusResponse(w http.ResponseWriter, statusCode int, status *status.Status, m *prometheus.CounterVec) {
w.Header().Add("Content-Type", "application/x-protobuf")
respBodyBytes, err := proto.Marshal(status)
if err != nil {
logger.Error("Failed to marshal response", "Error", err)
w.WriteHeader(http.StatusInternalServerError)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
return
}
m.WithLabelValues(strconv.Itoa(statusCode)).Inc()
w.WriteHeader(statusCode)
w.Write(respBodyBytes)
}
func newErrorStatus(msg string) *status.Status {
return &status.Status{
Message: msg,
}
}