lib/instrumentation/modules/@elastic/elasticsearch.js (241 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and other contributors where applicable. * Licensed under the BSD 2-Clause License; you may not use this file except in * compliance with the BSD 2-Clause License. */ 'use strict'; // Instrument the @elastic/elasticsearch module. // // Limitations: // - In @elastic/elasticsearch >=7.14 <8, the diagnostic events sent for ES // spans started before the product-check is finished will have an incorrect // `currentSpan`. // // An Elasticsearch (ES) request typically results in a single HTTP request to // the server. For some of the later 7.x versions of @elastic/elasticsearch // there is a product-check "GET /" that blocks the *first* request to the // server. The handling of ES requests are effectively queued until that // product-check is complete. When they *do* run, the async context is that // of the initial ES span. This means that `apm.currentSpan` inside an ES // client diagnostic event for these queued ES requests will be wrong. // Currently the APM agent is not patching for this. const semver = require('semver'); const { URL, URLSearchParams } = require('url'); const { getDBDestination } = require('../../context'); const { getElasticsearchDbStatement } = require('../../elasticsearch-shared'); const shimmer = require('../../shimmer'); /** * Get the Elasticsearch cluster name, if possible. * * This is currently a partial implementation of: * https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-db.md#cluster-name * * @param {import("@elastic/elasticsearch").DiagnosticResult || null} * @returns { string || null } */ function getESClusterName(diagResult) { if (diagResult && diagResult.headers) { const clusterNameFromHeader = diagResult.headers['x-found-handling-cluster']; if (clusterNameFromHeader) { return clusterNameFromHeader; } } } module.exports = function (elasticsearch, agent, { version, enabled }) { if (!enabled) { return elasticsearch; } if (!elasticsearch.Client) { agent.logger.debug( '@elastic/elasticsearch@%s is not supported (no `elasticsearch.Client`) - aborting...', version, ); return elasticsearch; } // Before v7.7.0 the Transport#request() implementation's Promises support // would re-call `this.request(...)` inside a Promise. const doubleCallsRequestIfNoCb = semver.lt(version, '7.7.0'); const ins = agent._instrumentation; const isGteV8 = semver.satisfies(version, '>=8', { includePrerelease: true }); const elasticsearchCaptureBodyUrlsRegExp = agent._conf.elasticsearchCaptureBodyUrlsRegExp; agent.logger.debug( 'shimming elasticsearch.Transport.prototype.{request,getConnection}', ); shimmer.wrap( elasticsearch.Transport && elasticsearch.Transport.prototype, 'request', wrapRequest, ); shimmer.wrap( elasticsearch.Transport && elasticsearch.Transport.prototype, 'getConnection', wrapGetConnection, ); shimmer.wrap(elasticsearch, 'Client', wrapClient); // Tracking the ES client Connection object and DiagnosticResult for each // active span. Use WeakMap to avoid a leak from possible spans that don't // end. const connFromSpan = new WeakMap(); const diagResultFromSpan = new WeakMap(); return elasticsearch; function wrapClient(OrigClient) { class ClientTraced extends OrigClient { constructor(...args) { super(...args); const diagnostic = isGteV8 ? this.diagnostic : this; diagnostic.on('response', (_err, result) => { if (result) { const currSpan = ins.currSpan(); if (currSpan) { diagResultFromSpan.set(currSpan, result); } } }); } } return ClientTraced; } // Transport#request() calls Transport#getConnection() when it is ready to // make the HTTP request. This returns the actual connection to be used for // the request. This is limited, however: // - `getConnection()` is not called if the request was aborted early. // - If all connections are marked dead, then this returns null. // - We are assuming this is called with the correct async context. See // "Limitations" above. function wrapGetConnection(origGetConnection) { return function wrappedGetConnection(opts) { const conn = origGetConnection.apply(this, arguments); const currSpan = ins.currSpan(); if (conn && currSpan) { connFromSpan.set(currSpan, conn); } return conn; }; } function wrapRequest(origRequest) { return function wrappedRequest(params, options, cb) { options = options || {}; if (typeof options === 'function') { cb = options; options = {}; } if (typeof cb !== 'function' && doubleCallsRequestIfNoCb) { return origRequest.apply(this, arguments); } const method = (params && params.method) || '<UnknownMethod>'; const path = (params && params.path) || '<UnknownPath>'; agent.logger.debug( { method, path }, 'intercepted call to @elastic/elasticsearch.Transport.prototype.request', ); const span = ins.createSpan( `Elasticsearch: ${method} ${path}`, 'db', 'elasticsearch', 'request', { exitSpan: true }, ); if (!span) { return origRequest.apply(this, arguments); } const parentRunContext = ins.currRunContext(); const spanRunContext = parentRunContext.enterSpan(span); const finish = ins.bindFunctionToRunContext( spanRunContext, (err, result) => { // Set destination context. // Use the connection from wrappedGetConnection() above, if that worked. // Otherwise, fallback to using the first connection on // `Transport#connectionPool`, if any. (This is the best parsed // representation of connection options passed to the Client ctor.) let conn = connFromSpan.get(span); if (conn) { connFromSpan.delete(span); } else if (this.connectionPool && this.connectionPool.connections) { conn = this.connectionPool.connections[0]; } const connUrl = conn && conn.url; span._setDestinationContext( getDBDestination( connUrl && connUrl.hostname, connUrl && connUrl.port, ), ); // Gather some HTTP context. // We are *not* including the response headers b/c they are boring: // // X-elastic-product: Elasticsearch // content-type: application/json // content-length: ... // // Getting the ES client request "DiagnosticResult" object has some edge cases: // - In v7 using a callback, we always get it as `result`. // - In v7 using a Promise, if the promise is rejected, then `result` is // not passed. // - In v8, `result` only includes HTTP response info if `options.meta` // is true. We use the diagnostic 'response' event instead. // - In v7, see the limitation note above for the rare start case where // the diagnostic 'response' event may have the wrong currentSpan. // The result is that with Promise usage of v7, ES client requests that // are queued behind the "product-check" and that reject, won't have a // `diagResult`. const httpContext = {}; let haveHttpContext = false; let diagResult = isGteV8 ? null : result; if (!diagResult) { diagResult = diagResultFromSpan.get(span); if (diagResult) { diagResultFromSpan.delete(span); } } if (diagResult) { if (diagResult.statusCode) { haveHttpContext = true; httpContext.status_code = diagResult.statusCode; } if (diagResult.headers && 'content-length' in diagResult.headers) { const contentLength = Number( diagResult.headers['content-length'], ); if (!isNaN(contentLength)) { haveHttpContext = true; httpContext.response = { encoded_body_size: contentLength }; } } } // Reconstruct the full URL (including query params). let origin; if (connUrl) { origin = connUrl.origin; } else if ( diagResult && diagResult.meta && diagResult.meta.connection && diagResult.meta.connection.url ) { try { origin = new URL(diagResult.meta.connection.url).origin; } catch (_ignoredErr) {} } if (origin && params && params.path) { const fullUrl = new URL(origin); fullUrl.pathname = params.path; fullUrl.search = new URLSearchParams(params.querystring).toString(); httpContext.url = fullUrl.toString(); haveHttpContext = true; } if (haveHttpContext) { span.setHttpContext(httpContext); } // Set DB context. const dbContext = { type: 'elasticsearch', }; if (params) { const statement = getElasticsearchDbStatement( params.path, params.body || params.bulkBody, elasticsearchCaptureBodyUrlsRegExp, ); if (statement) { dbContext.statement = statement; } } const clusterName = getESClusterName(diagResult); if (clusterName) { dbContext.instance = clusterName; } span.setDbContext(dbContext); if (err) { // Error properties are specified here: // https://github.com/elastic/elasticsearch-js/blob/master/lib/errors.d.ts // - We capture some data from ResponseError, which is for // Elasticsearch API errors: // https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#common-options-error-options // - Otherwise we explicitly turn off `captureAttributes` to avoid // grabbing potentially large and sensitive properties like // `err.data` on DeserializationError. const errOpts = { captureAttributes: false, }; const errBody = err.body; if (err.name === 'ResponseError' && errBody && errBody.error) { // Include some data from the Elasticsearch API response body: // https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#common-options-error-options const errType = errBody.error.type; if (errType) { // Specialize `error.exception.type` for better error grouping. errOpts.exceptionType = `ResponseError (${errType})`; } errOpts.custom = { type: errType, reason: errBody.error.reason, status: errBody.status, }; if (errBody.error.caused_by) { errOpts.custom.caused_by = errBody.error.caused_by; } } agent.captureError(err, errOpts); } span.end(); }, ); if (typeof cb === 'function') { const wrappedCb = (err, result) => { finish(err, result); ins.withRunContext(parentRunContext, cb, this, err, result); }; return ins.withRunContext( spanRunContext, origRequest, this, params, options, wrappedCb, ); } else { const origPromise = ins.withRunContext( spanRunContext, origRequest, this, ...arguments, ); origPromise.then( function onResolve(result) { finish(null, result); }, function onReject(err) { finish(err, null); }, ); return origPromise; } }; } };