module/apmgrpc/client.go (207 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apmgrpc // import "go.elastic.co/apm/module/apmgrpc/v2" import ( "context" "net" "net/http" "net/url" "sync" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "go.elastic.co/apm/module/apmhttp/v2" "go.elastic.co/apm/v2" ) // NewUnaryClientInterceptor returns a grpc.UnaryClientInterceptor that // traces gRPC requests with the given options. // // The interceptor will trace spans with the "external.grpc" type for each // request made, for any client method presented with a context containing // a sampled apm.Transaction. func NewUnaryClientInterceptor(o ...ClientOption) grpc.UnaryClientInterceptor { opts := clientOptions{} for _, o := range o { o(&opts) } return func( ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, ) error { var peer peer.Peer // maybe set after call if span != nil var header metadata.MD // maybe set after call if span != nil span, ctx := startSpan(ctx, method) if span != nil { defer span.End() opts = append(opts, grpc.Peer(&peer), grpc.Header(&header)) } err := invoker(ctx, method, req, resp, cc, opts...) if span != nil { setSpanOutcome(span, err) url := url.URL{Scheme: "http", Path: method} if _, ok := peer.AuthInfo.(credentials.TLSInfo); ok { url.Scheme = "https" } if peer.Addr != nil { url.Host = peer.Addr.String() } span.Context.SetHTTPRequest(&http.Request{ URL: &url, Method: "POST", // method is always POST ProtoMajor: 2, ProtoMinor: 0, Header: http.Header(header), }) if url.Host != "" { span.Context.SetDestinationService(apm.DestinationServiceSpanContext{ Name: url.Host, Resource: url.Host, }) } span.Context.SetServiceTarget(apm.ServiceTargetSpanContext{ Name: url.Host, }) } return err } } // NewStreamClientInterceptor returns a grpc.StreamClientInterceptor that // traces gRPC requests with the given options. // // The interceptor will trace spans with the "external.grpc" type for each // stream request made, for any client method presented with a context // containing a sampled apm.Transaction. // // Spans are ended when the stream is closed, which can happen in various // ways: the initial stream setup request fails, Header, SendMsg or RecvMsg // return with an error, or RecvMsg returns for a non-streaming server. func NewStreamClientInterceptor(o ...ClientOption) grpc.StreamClientInterceptor { opts := clientOptions{} for _, o := range o { o(&opts) } return func( ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption, ) (grpc.ClientStream, error) { var peer peer.Peer span, ctx := startSpan(ctx, method) if span != nil { opts = append(opts, grpc.Peer(&peer)) } stream, err := streamer(ctx, desc, cc, method, opts...) if span != nil { if err != nil { setSpanOutcome(span, err) setSpanContext(span, peer) span.End() } else if stream != nil { wrapped := &clientStream{ClientStream: stream} go func(stream grpc.ClientStream) { defer span.End() // Header blocks until headers are available // or the stream is ended. Either way, after // Header returns, it is safe to call Context(). stream.Header() <-stream.Context().Done() err := wrapped.getError() setSpanOutcome(span, err) setSpanContext(span, peer) }(stream) stream = wrapped } } return stream, err } } // clientStream wraps grpc.ClientStream to intercept errors. type clientStream struct { grpc.ClientStream mu sync.RWMutex err error } func (s *clientStream) CloseSend() error { err := s.ClientStream.CloseSend() s.setError(err) return err } func (s *clientStream) Header() (metadata.MD, error) { md, err := s.ClientStream.Header() s.setError(err) return md, err } func (s *clientStream) SendMsg(m interface{}) error { err := s.ClientStream.SendMsg(m) s.setError(err) return err } func (s *clientStream) RecvMsg(m interface{}) error { err := s.ClientStream.RecvMsg(m) s.setError(err) return err } func (s *clientStream) getError() error { s.mu.RLock() defer s.mu.RUnlock() return s.err } func (s *clientStream) setError(err error) { if err != nil { s.mu.Lock() s.err = err s.mu.Unlock() } } func startSpan(ctx context.Context, name string) (*apm.Span, context.Context) { tx := apm.TransactionFromContext(ctx) if tx == nil { return nil, ctx } traceContext := tx.TraceContext() propagateLegacyHeader := tx.ShouldPropagateLegacyHeader() if !traceContext.Options.Recorded() { return nil, outgoingContextWithTraceContext(ctx, traceContext, propagateLegacyHeader) } span := tx.StartExitSpan(name, "external.grpc", apm.SpanFromContext(ctx)) if !span.Dropped() { traceContext = span.TraceContext() ctx = apm.ContextWithSpan(ctx, span) } return span, outgoingContextWithTraceContext(ctx, traceContext, propagateLegacyHeader) } func setSpanContext(span *apm.Span, peer peer.Peer) { if peer.Addr != nil { if tcpAddr, ok := peer.Addr.(*net.TCPAddr); ok { span.Context.SetDestinationAddress(tcpAddr.IP.String(), tcpAddr.Port) } addrString := peer.Addr.String() span.Context.SetDestinationService(apm.DestinationServiceSpanContext{ Name: addrString, Resource: addrString, }) } } func outgoingContextWithTraceContext( ctx context.Context, traceContext apm.TraceContext, propagateLegacyHeader bool, ) context.Context { traceparentValue := apmhttp.FormatTraceparentHeader(traceContext) md, ok := metadata.FromOutgoingContext(ctx) if !ok { md = metadata.Pairs(w3cTraceparentHeader, traceparentValue) } else { md = md.Copy() md.Set(w3cTraceparentHeader, traceparentValue) } if propagateLegacyHeader { md.Set(elasticTraceparentHeader, traceparentValue) } if tracestate := traceContext.State.String(); tracestate != "" { md.Set(tracestateHeader, tracestate) } return metadata.NewOutgoingContext(ctx, md) } func setSpanOutcome(span *apm.Span, err error) { statusCode := statusCodeFromError(err) // On the client side, all codes except for OK are treated as failures // by default, and can be overridden by setting the Outcome explicitly. if span.Outcome == "" { switch statusCode { case codes.OK: span.Outcome = "success" default: span.Outcome = "failure" } } } type clientOptions struct { tracer *apm.Tracer } // ClientOption sets options for client-side tracing. type ClientOption func(*clientOptions)