elastictransport/instrumentation.go (141 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package elastictransport
import (
"bytes"
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"io"
"net/http"
"strconv"
)
const schemaUrl = "https://opentelemetry.io/schemas/1.21.0"
const tracerName = "elasticsearch-api"
// Constants for Semantic Convention
// see https://opentelemetry.io/docs/specs/semconv/database/elasticsearch/ for details.
const attrDbSystem = "db.system"
const attrDbStatement = "db.statement"
const attrDbOperation = "db.operation"
const attrDbElasticsearchClusterName = "db.elasticsearch.cluster.name"
const attrDbElasticsearchNodeName = "db.elasticsearch.node.name"
const attrHttpRequestMethod = "http.request.method"
const attrUrlFull = "url.full"
const attrServerAddress = "server.address"
const attrServerPort = "server.port"
const attrPathParts = "db.elasticsearch.path_parts."
// Instrumentation defines the interface the client uses to propagate information about the requests.
// Each method is called with the current context or request for propagation.
type Instrumentation interface {
// Start creates the span before building the request, returned context will be propagated to the request by the client.
Start(ctx context.Context, name string) context.Context
// Close will be called once the client has returned.
Close(ctx context.Context)
// RecordError propagates an error.
RecordError(ctx context.Context, err error)
// RecordPathPart provides the path variables, called once per variable in the url.
RecordPathPart(ctx context.Context, pathPart, value string)
// RecordRequestBody provides the endpoint name as well as the current request payload.
RecordRequestBody(ctx context.Context, endpoint string, query io.Reader) io.ReadCloser
// BeforeRequest provides the request and endpoint name, called before sending to the server.
BeforeRequest(req *http.Request, endpoint string)
// AfterRequest provides the request, system used (e.g. elasticsearch) and endpoint name.
// Called after the request has been enhanced with the information from the transport and sent to the server.
AfterRequest(req *http.Request, system, endpoint string)
// AfterResponse provides the response.
AfterResponse(ctx context.Context, res *http.Response)
}
type ElasticsearchOpenTelemetry struct {
tracer trace.Tracer
recordBody bool
}
// NewOtelInstrumentation returns a new instrument for Open Telemetry traces
// If no provider is passed, the instrumentation will fall back to the global otel provider.
// captureSearchBody sets the query capture behavior for search endpoints.
// version should be set to the version provided by the caller.
func NewOtelInstrumentation(provider trace.TracerProvider, captureSearchBody bool, version string, options ...trace.TracerOption) *ElasticsearchOpenTelemetry {
if provider == nil {
provider = otel.GetTracerProvider()
}
options = append(options, trace.WithInstrumentationVersion(version), trace.WithSchemaURL(schemaUrl))
return &ElasticsearchOpenTelemetry{
tracer: provider.Tracer(
tracerName,
options...,
),
recordBody: captureSearchBody,
}
}
// Start begins a new span in the given context with the provided name.
// Span will always have a kind set to trace.SpanKindClient.
// The context span aware is returned for use within the client.
func (i ElasticsearchOpenTelemetry) Start(ctx context.Context, name string) context.Context {
newCtx, _ := i.tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient))
return newCtx
}
// Close call for the end of the span, preferably defered by the client once started.
func (i ElasticsearchOpenTelemetry) Close(ctx context.Context) {
span := trace.SpanFromContext(ctx)
if span.IsRecording() {
span.End()
}
}
// shouldRecordRequestBody filters for search endpoints.
func (i ElasticsearchOpenTelemetry) shouldRecordRequestBody(endpoint string) bool {
// allow list of endpoints that will propagate query to OpenTelemetry.
// see https://opentelemetry.io/docs/specs/semconv/database/elasticsearch/#call-level-attributes
var searchEndpoints = map[string]struct{}{
"search": {},
"async_search.submit": {},
"msearch": {},
"eql.search": {},
"terms_enum": {},
"search_template": {},
"msearch_template": {},
"render_search_template": {},
}
if i.recordBody {
if _, ok := searchEndpoints[endpoint]; ok {
return true
}
}
return false
}
// RecordRequestBody add the db.statement attributes only for search endpoints.
// Returns a new reader if the query has been recorded, nil otherwise.
func (i ElasticsearchOpenTelemetry) RecordRequestBody(ctx context.Context, endpoint string, query io.Reader) io.ReadCloser {
if i.shouldRecordRequestBody(endpoint) == false {
return nil
}
span := trace.SpanFromContext(ctx)
if span.IsRecording() {
buf := bytes.Buffer{}
buf.ReadFrom(query)
span.SetAttributes(attribute.String(attrDbStatement, buf.String()))
getBody := func() (io.ReadCloser, error) {
reader := buf
return io.NopCloser(&reader), nil
}
reader, _ := getBody()
return reader
}
return nil
}
// RecordError sets any provided error as an OTel error in the active span.
func (i ElasticsearchOpenTelemetry) RecordError(ctx context.Context, err error) {
span := trace.SpanFromContext(ctx)
if span.IsRecording() {
span.SetStatus(codes.Error, "an error happened while executing a request")
span.RecordError(err)
}
}
// RecordPathPart sets the couple for a specific path part.
// An index placed in the path would translate to `db.elasticsearch.path_parts.index`.
func (i ElasticsearchOpenTelemetry) RecordPathPart(ctx context.Context, pathPart, value string) {
span := trace.SpanFromContext(ctx)
if span.IsRecording() {
span.SetAttributes(attribute.String(attrPathParts+pathPart, value))
}
}
// BeforeRequest noop for interface.
func (i ElasticsearchOpenTelemetry) BeforeRequest(req *http.Request, endpoint string) {}
// AfterRequest enrich the span with the available data from the request.
func (i ElasticsearchOpenTelemetry) AfterRequest(req *http.Request, system, endpoint string) {
span := trace.SpanFromContext(req.Context())
if span.IsRecording() {
span.SetAttributes(
attribute.String(attrDbSystem, system),
attribute.String(attrDbOperation, endpoint),
attribute.String(attrHttpRequestMethod, req.Method),
attribute.String(attrUrlFull, req.URL.String()),
attribute.String(attrServerAddress, req.URL.Hostname()),
)
if value, err := strconv.ParseInt(req.URL.Port(), 10, 32); err == nil {
span.SetAttributes(attribute.Int64(attrServerPort, value))
}
}
}
// AfterResponse enric the span with the cluster id and node name if the query was executed on Elastic Cloud.
func (i ElasticsearchOpenTelemetry) AfterResponse(ctx context.Context, res *http.Response) {
span := trace.SpanFromContext(ctx)
if span.IsRecording() {
if id := res.Header.Get("X-Found-Handling-Cluster"); id != "" {
span.SetAttributes(
attribute.String(attrDbElasticsearchClusterName, id),
)
}
if name := res.Header.Get("X-Found-Handling-Instance"); name != "" {
span.SetAttributes(
attribute.String(attrDbElasticsearchNodeName, name),
)
}
}
}