module/apmelasticsearch/client.go (197 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apmelasticsearch // import "go.elastic.co/apm/module/apmelasticsearch/v2"
import (
"bytes"
"compress/gzip"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
"sync/atomic"
"unsafe"
"go.elastic.co/apm/module/apmhttp/v2"
"go.elastic.co/apm/v2"
)
// WrapRoundTripper returns an http.RoundTripper wrapping r, reporting each
// request as a span to Elastic APM, if the request's context contains a
// sampled transaction.
//
// If r is nil, then http.DefaultTransport is wrapped.
func WrapRoundTripper(r http.RoundTripper, o ...ClientOption) http.RoundTripper {
if r == nil {
r = http.DefaultTransport
}
rt := &roundTripper{r: r, clusterNameFunc: DefaultClusterName(r)}
for _, o := range o {
o(rt)
}
return rt
}
type roundTripper struct {
r http.RoundTripper
clusterNameFunc ClusterNameFunc
}
// RoundTrip delegates to r.r, emitting a span if req's context contains a transaction.
//
// If req.URL.Path corresponds to a search request, then RoundTrip will attempt to extract
// the search query to use as the span context's "database statement". If the query is
// passed in as a query parameter (i.e. "/_search?q=foo:bar"), then that will be used;
// otherwise, the request body will be read. In the latter case, req.GetBody is used
// if defined, otherwise we read req.Body, preserving its contents for the underlying
// RoundTripper. If the request body is gzip-encoded, it will be decoded.
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
tx := apm.TransactionFromContext(ctx)
if tx == nil {
return r.r.RoundTrip(req)
}
traceContext := tx.TraceContext()
if !tx.Sampled() {
apmhttp.SetHeaders(req, traceContext, false)
return r.r.RoundTrip(req)
}
propagateLegacyHeader := tx.ShouldPropagateLegacyHeader()
name := requestName(req)
span := tx.StartExitSpan(name, "db.elasticsearch", apm.SpanFromContext(ctx))
if span.Dropped() {
span.End()
apmhttp.SetHeaders(req, traceContext, propagateLegacyHeader)
return r.r.RoundTrip(req)
}
traceContext = span.TraceContext()
statement, req := captureSearchStatement(req)
username, _, _ := req.BasicAuth()
ctx = apm.ContextWithSpan(ctx, span)
req = apmhttp.RequestWithContext(ctx, req)
span.Context.SetHTTPRequest(req)
span.Context.SetDestinationService(apm.DestinationServiceSpanContext{
Name: "elasticsearch",
Resource: "elasticsearch",
})
apmhttp.SetHeaders(req, traceContext, propagateLegacyHeader)
resp, err := r.r.RoundTrip(req)
if err != nil {
clusterName := "" // undefined in error case
setDatabaseSpanContext(span, statement, username, clusterName)
span.End()
return resp, err
}
clusterName := r.clusterNameFunc(resp)
setDatabaseSpanContext(span, statement, username, clusterName)
span.Context.SetHTTPStatusCode(resp.StatusCode)
resp.Body = &responseBody{span: span, body: resp.Body}
return resp, nil
}
func setDatabaseSpanContext(span *apm.Span, statement, username, clusterName string) {
span.Context.SetServiceTarget(apm.ServiceTargetSpanContext{
Type: "elasticsearch",
Name: clusterName,
})
span.Context.SetDatabase(apm.DatabaseSpanContext{
Instance: clusterName,
Type: "elasticsearch",
Statement: statement,
User: username,
})
}
// CloseIdleConnections calls r.r.CloseIdleConnections if the method exists.
func (r *roundTripper) CloseIdleConnections() {
type closeIdler interface {
CloseIdleConnections()
}
if tr, ok := r.r.(closeIdler); ok {
tr.CloseIdleConnections()
}
}
type responseBody struct {
span *apm.Span
body io.ReadCloser
}
// Close closes the response body, and ends the span if it hasn't already been ended.
func (b *responseBody) Close() error {
b.endSpan()
return b.body.Close()
}
// Read reads from the response body, and ends the span when io.EOF is returend if
// the span hasn't already been ended.
func (b *responseBody) Read(p []byte) (n int, err error) {
n, err = b.body.Read(p)
if err == io.EOF {
b.endSpan()
}
return n, err
}
func (b *responseBody) endSpan() {
addr := (*unsafe.Pointer)(unsafe.Pointer(&b.span))
if old := atomic.SwapPointer(addr, nil); old != nil {
(*apm.Span)(old).End()
}
}
// ClientOption sets options for tracing client requests.
type ClientOption func(*roundTripper)
// captureSearchStatement captures the search URI query or request body.
//
// If the request must be modified (i.e. because the body must be read),
// then captureSearchStatement returns a new *http.Request to be passed
// to the underlying http.RoundTripper. Otherwise, req is returned.
func captureSearchStatement(req *http.Request) (string, *http.Request) {
if !isSearchURL(req.URL) {
return "", req
}
// If "q" is in query params, use that for statement.
if req.URL.RawQuery != "" {
query := req.URL.Query()
if statement := query.Get("q"); statement != "" {
return statement, req
}
}
if req.Body == nil || req.Body == http.NoBody {
return "", req
}
var bodyBuf bytes.Buffer
if req.GetBody != nil {
// req.GetBody is defined, so we can read a copy of the
// request body instead of messing with the original request
// body.
body, err := req.GetBody()
if err != nil {
return "", req
}
if _, err := bodyBuf.ReadFrom(limitedBody(body, req.ContentLength)); err != nil {
body.Close()
return "", req
}
if err := body.Close(); err != nil {
return "", req
}
} else {
type readCloser struct {
io.Reader
io.Closer
}
newBody := &readCloser{Closer: req.Body}
reqCopy := *req
reqCopy.Body = newBody
if _, err := bodyBuf.ReadFrom(limitedBody(req.Body, req.ContentLength)); err != nil {
// Continue with the request, ensuring that req.Body returns
// the same content and error, but don't use the consumed body
// for the statement.
newBody.Reader = io.MultiReader(bytes.NewReader(bodyBuf.Bytes()), errorReader{err: err})
return "", &reqCopy
}
newBody.Reader = io.MultiReader(bytes.NewReader(bodyBuf.Bytes()), req.Body)
req = &reqCopy
}
var statement string
if req.Header.Get("Content-Encoding") == "gzip" {
if r, err := gzip.NewReader(&bodyBuf); err == nil {
if content, err := ioutil.ReadAll(r); err == nil {
statement = string(content)
}
}
} else {
statement = bodyBuf.String()
}
return statement, req
}
func isSearchURL(url *url.URL) bool {
switch dir, file := path.Split(url.Path); file {
case "_search", "_msearch", "_rollup_search":
return true
case "template":
if dir == "" {
return false
}
switch _, file := path.Split(dir[:len(dir)-1]); file {
case "_search", "_msearch":
// ".../_search/template" or ".../_msearch/template"
return true
}
}
return false
}
func limitedBody(r io.Reader, n int64) io.Reader {
// maxLimit is the maximum size of the request body that we'll read,
// set to 10000 to match the maximum length of the "db.statement"
// span context field.
const maxLimit = 10000
if n <= 0 {
return r
}
if n > maxLimit {
n = maxLimit
}
return &io.LimitedReader{R: r, N: n}
}
type errorReader struct {
err error
}
func (r errorReader) Read(p []byte) (int, error) {
return 0, r.err
}