module/apmgrpc/server.go (231 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apmgrpc // import "go.elastic.co/apm/module/apmgrpc/v2" import ( "context" "crypto/tls" "net/http" "net/url" "strings" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/status" "go.elastic.co/apm/module/apmhttp/v2" "go.elastic.co/apm/v2" ) var ( elasticTraceparentHeader = strings.ToLower(apmhttp.ElasticTraceparentHeader) w3cTraceparentHeader = strings.ToLower(apmhttp.W3CTraceparentHeader) tracestateHeader = strings.ToLower(apmhttp.TracestateHeader) ) // NewUnaryServerInterceptor returns a grpc.UnaryServerInterceptor that // traces gRPC requests with the given options. // // The interceptor will trace transactions with the "request" type for // each incoming request. The transaction will be added to the context, // so server methods can use apm.StartSpan with the provided context. // // By default, the interceptor will trace with apm.DefaultTracer(), // and will not recover any panics. Use WithTracer to specify an // alternative tracer, and WithRecovery to enable panic recovery. func NewUnaryServerInterceptor(o ...ServerOption) grpc.UnaryServerInterceptor { opts := serverOptions{ tracer: apm.DefaultTracer(), recover: false, requestIgnorer: DefaultServerRequestIgnorer(), streamIgnorer: DefaultServerStreamIgnorer(), } for _, o := range o { o(&opts) } return func( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (resp interface{}, err error) { if !opts.tracer.Recording() || opts.requestIgnorer(info) { return handler(ctx, req) } tx, ctx := startTransaction(ctx, opts.tracer, info.FullMethod) defer tx.End() // TODO(axw) define span context schema for RPC, // including at least the peer address. defer func() { r := recover() if r != nil { e := opts.tracer.Recovered(r) e.SetTransaction(tx) e.Context.SetFramework("grpc", grpc.Version) e.Handled = opts.recover e.Send() if opts.recover { err = status.Errorf(codes.Internal, "%s", r) } else { panic(r) } } setTransactionResult(tx, err) }() resp, err = handler(ctx, req) return resp, err } } // NewStreamServerInterceptor returns a grpc.StreamServerInterceptor that // traces gRPC stream requests with the given options. // // The interceptor will trace transactions with the "request" type for each // incoming stream request. The transaction will be added to the context, so // server methods can use apm.StartSpan with the provided context. // // By default, the interceptor will trace with apm.DefaultTracer(), and will // not recover any panics. Use WithTracer to specify an alternative tracer, // and WithRecovery to enable panic recovery. func NewStreamServerInterceptor(o ...ServerOption) grpc.StreamServerInterceptor { opts := serverOptions{ tracer: apm.DefaultTracer(), recover: false, streamIgnorer: DefaultServerStreamIgnorer(), } for _, o := range o { o(&opts) } return func( srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, ) (err error) { if !opts.tracer.Recording() || opts.streamIgnorer(info) { return handler(srv, stream) } ctx := stream.Context() tx, ctx := startTransaction(ctx, opts.tracer, info.FullMethod) defer tx.End() wrapped := wrapServerStream(stream) wrapped.wrappedContext = ctx // TODO(axw) define span context schema for RPC, // including at least the peer address. defer func() { r := recover() if r != nil { e := opts.tracer.Recovered(r) e.SetTransaction(tx) e.Context.SetFramework("grpc", grpc.Version) e.Handled = opts.recover e.Send() if opts.recover { err = status.Errorf(codes.Internal, "%s", r) } else { panic(r) } } setTransactionResult(tx, err) }() return handler(srv, wrapped) } } func startTransaction(ctx context.Context, tracer *apm.Tracer, name string) (*apm.Transaction, context.Context) { var opts apm.TransactionOptions md, ok := metadata.FromIncomingContext(ctx) if ok { traceContext, ok := getIncomingMetadataTraceContext(md, w3cTraceparentHeader) if !ok { traceContext, _ = getIncomingMetadataTraceContext(md, elasticTraceparentHeader) } opts.TraceContext = traceContext } tx := tracer.StartTransactionOptions(name, "request", opts) tx.Context.SetFramework("grpc", grpc.Version) if peer, ok := peer.FromContext(ctx); ok { // Set underlying HTTP/2.0 request context. // // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md var tlsConnectionState *tls.ConnectionState var peerAddr string var authority string url := url.URL{Scheme: "http", Path: name} if info, ok := peer.AuthInfo.(credentials.TLSInfo); ok { url.Scheme = "https" tlsConnectionState = &info.State } if peer.Addr != nil { peerAddr = peer.Addr.String() } if values := md.Get(":authority"); len(values) > 0 { authority = values[0] } tx.Context.SetHTTPRequest(&http.Request{ URL: &url, Method: "POST", // method is always POST ProtoMajor: 2, ProtoMinor: 0, Header: http.Header(md), Host: authority, RemoteAddr: peerAddr, TLS: tlsConnectionState, }) } return tx, apm.ContextWithTransaction(ctx, tx) } func getIncomingMetadataTraceContext(md metadata.MD, header string) (apm.TraceContext, bool) { if values := md.Get(header); len(values) == 1 { traceContext, err := apmhttp.ParseTraceparentHeader(values[0]) if err == nil { traceContext.State, _ = apmhttp.ParseTracestateHeader(md.Get(tracestateHeader)...) return traceContext, true } } return apm.TraceContext{}, false } func setTransactionResult(tx *apm.Transaction, err error) { statusCode := statusCodeFromError(err) tx.Result = statusCode.String() // For gRPC servers, the transaction outcome is generally "success", // except for codes which are not subject to client interpretation. if tx.Outcome == "" { switch statusCode { case codes.Unknown, codes.DeadlineExceeded, codes.ResourceExhausted, codes.FailedPrecondition, codes.Aborted, codes.Internal, codes.Unavailable, codes.DataLoss: tx.Outcome = "failure" default: tx.Outcome = "success" } } } func statusCodeFromError(err error) codes.Code { if err == nil { return codes.OK } statusCode := codes.Unknown if s, ok := status.FromError(err); ok { statusCode = s.Code() } return statusCode } type serverOptions struct { tracer *apm.Tracer recover bool requestIgnorer RequestIgnorerFunc streamIgnorer StreamIgnorerFunc } // ServerOption sets options for server-side tracing. type ServerOption func(*serverOptions) // WithTracer returns a ServerOption which sets t as the tracer // to use for tracing server requests. func WithTracer(t *apm.Tracer) ServerOption { if t == nil { panic("t == nil") } return func(o *serverOptions) { o.tracer = t } } // WithRecovery returns a ServerOption which enables panic recovery // in the gRPC server interceptor. // // The interceptor will report panics as errors to Elastic APM, // but unless this is enabled, they will still cause the server to // be terminated. With recovery enabled, panics will be translated // to gRPC errors with the code gprc/codes.Internal. func WithRecovery() ServerOption { return func(o *serverOptions) { o.recover = true } } // RequestIgnorerFunc is the type of a function for use in // WithServerRequestIgnorer. type RequestIgnorerFunc func(*grpc.UnaryServerInfo) bool // WithServerRequestIgnorer returns a ServerOption which sets r as the // function to use to determine whether or not a server request should // be ignored. If r is nil, all requests will be reported. func WithServerRequestIgnorer(r RequestIgnorerFunc) ServerOption { if r == nil { r = IgnoreNone } return func(o *serverOptions) { o.requestIgnorer = r } } // StreamIgnorerFunc is the type of a function for use in // WithServerStreamIgnorer. type StreamIgnorerFunc func(*grpc.StreamServerInfo) bool // WithServerStreamIgnorer returns a ServerOption which sets s as the // function to use to determine whether or not a server stream request // should be ignored. If s is nil, all stream requests will be reported. func WithServerStreamIgnorer(s StreamIgnorerFunc) ServerOption { if s == nil { s = IgnoreNoneStream } return func(o *serverOptions) { o.streamIgnorer = s } } // wrappedServerStream is a thin wrapper around grpc.ServerStream that allows modifying context. type wrappedServerStream struct { grpc.ServerStream // wrappedContext is the wrapper's own Context. You can assign it. wrappedContext context.Context } // Context returns the wrapper's WrappedContext, overwriting the nested grpc.ServerStream.Context() func (w *wrappedServerStream) Context() context.Context { return w.wrappedContext } // wrapServerStream returns a ServerStream that has the ability to overwrite context. func wrapServerStream(stream grpc.ServerStream) *wrappedServerStream { return &wrappedServerStream{ServerStream: stream, wrappedContext: stream.Context()} }