pkg/controller/common/tracing/apmclientgo/client.go (143 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package apmclientgo import ( "io" "net/http" "strings" "sync/atomic" "unsafe" "github.com/pkg/errors" "go.elastic.co/apm/module/apmhttp/v2" "go.elastic.co/apm/v2" ) // WrapRoundTripper returns a http.Roundtripper wrapping r, reporting each // request as a span to Elastic APM, if the request's context contains a sampled transaction // Allows an optional default transaction to be configured for requests where context cannot be controlled // for example client-go's cache management func WrapRoundTripper(r http.RoundTripper, o ...ClientOption) http.RoundTripper { if r == nil { r = http.DefaultTransport } rt := &roundTripper{r: r} // apply any client options for _, o := range o { o(rt) } return rt } type roundTripper struct { r http.RoundTripper defaultTxFn func() *apm.Transaction } func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { ctx := req.Context() tx := apm.TransactionFromContext(ctx) if tx == nil && r.defaultTxFn != nil { tx = r.defaultTxFn() if tx != nil { defer tx.End() } } if tx == nil { return r.r.RoundTrip(req) } traceContext := tx.TraceContext() if !tx.Sampled() { apmhttp.SetHeaders(req, traceContext, false) return r.r.RoundTrip(req) } propagateLegacyHeader := tx.ShouldPropagateLegacyHeader() requestName := requestName(req) name := spanName(requestName) span := tx.StartSpan(name, "db.kubernetes", apm.SpanFromContext(ctx)) if span.Dropped() { span.End() apmhttp.SetHeaders(req, traceContext, propagateLegacyHeader) return r.r.RoundTrip(req) } traceContext = span.TraceContext() ctx = apm.ContextWithSpan(ctx, span) req = apmhttp.RequestWithContext(ctx, req) span.Context.SetHTTPRequest(req) span.Context.SetDestinationService(apm.DestinationServiceSpanContext{ Name: "Kubernetes API server", Resource: "Kubernetes", }) span.Context.SetDatabase(apm.DatabaseSpanContext{ Statement: requestName, Type: "kubernetes", }) apmhttp.SetHeaders(req, traceContext, propagateLegacyHeader) resp, err := r.r.RoundTrip(req) if err != nil { span.End() } else { span.Context.SetHTTPStatusCode(resp.StatusCode) resp.Body = &responseBody{span: span, body: resp.Body} } return resp, err } // CloseIdleConnections calls r.r.CloseIdleConnections if the method exists. func (r *roundTripper) CloseIdleConnections() { type closeIdler interface { CloseIdleConnections() } if tr, ok := r.r.(closeIdler); ok { tr.CloseIdleConnections() } } // CancelRequest calls r.r.CancelRequest(req) if the method exists. func (r *roundTripper) CancelRequest(req *http.Request) { type cancelRequester interface { CancelRequest(*http.Request) } if r, ok := r.r.(cancelRequester); ok { r.CancelRequest(req) } } type responseBody struct { span *apm.Span body io.ReadCloser } // Close closes the response body, and ends the span if it hasn't already been ended. func (b *responseBody) Close() error { b.endSpan() return b.body.Close() } // Read reads from the response body, and ends the span when io.EOF is returned if // the span hasn't already been ended. func (b *responseBody) Read(p []byte) (n int, err error) { n, err = b.body.Read(p) if errors.Is(err, io.EOF) { b.endSpan() } return n, err } func (b *responseBody) endSpan() { addr := (*unsafe.Pointer)(unsafe.Pointer(&b.span)) if old := atomic.SwapPointer(addr, nil); old != nil { (*apm.Span)(old).End() } } func spanName(reqName string) string { const prefix = "Kubernetes:" var b strings.Builder b.Grow(len(prefix) + 1 + len(reqName)) b.WriteString(prefix) b.WriteRune(' ') b.WriteString(reqName) return b.String() } func requestName(req *http.Request) string { statement := req.Method numSegments := 2 // add a bit more context in the summary for PUT requests e.g. namespace if req.Method == "PUT" { numSegments = 3 } pathSegments := strings.Split(req.URL.Path, "/") path := strings.Join(pathSegments[len(pathSegments)-numSegments:], "/") // let's call out watch requests explicitly if watch := req.URL.Query().Get("watch"); watch == "true" { statement = "WATCH" } var b strings.Builder b.Grow(len(statement) + 1 + len(path)) b.WriteString(statement) b.WriteRune(' ') b.WriteString(path) return b.String() } // ClientOption sets options for tracing client requests. type ClientOption func(*roundTripper) // WithDefaultTransaction configures the roundtripper to start a new APM transaction if no transaction is currently running // using the factory function f. func WithDefaultTransaction(f func() *apm.Transaction) ClientOption { return ClientOption(func(rt *roundTripper) { rt.defaultTxFn = f }) }