lib/instrumentation/modules/cassandra-driver.js (229 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and other contributors where applicable. * Licensed under the BSD 2-Clause License; you may not use this file except in * compliance with the BSD 2-Clause License. */ 'use strict'; const semver = require('semver'); const sqlSummary = require('sql-summary'); const shimmer = require('../shimmer'); module.exports = function (cassandra, agent, { version, enabled }) { if (!enabled) return cassandra; if (!semver.satisfies(version, '>=3 <5')) { agent.logger.debug( 'cassandra-driver version %s not supported - aborting...', version, ); return cassandra; } const ins = agent._instrumentation; if (cassandra.Client) { if (semver.gte(version, '4.4.0')) { // Prior to v4.4.0, the regular `connect` function would be called by the // other functions (e.g. `execute`). In newer versions an internal // `_connect` function is called instead (this is also called by // `connect`). shimmer.wrap(cassandra.Client.prototype, '_connect', wrapAsyncConnect); } else { shimmer.wrap(cassandra.Client.prototype, 'connect', wrapConnect); } shimmer.wrap(cassandra.Client.prototype, 'execute', wrapExecute); shimmer.wrap(cassandra.Client.prototype, 'eachRow', wrapEachRow); shimmer.wrap(cassandra.Client.prototype, 'batch', wrapBatch); } return cassandra; function wrapAsyncConnect(original) { return async function wrappedAsyncConnect() { const span = ins.createSpan( 'Cassandra: Connect', 'db', 'cassandra', 'connect', { exitSpan: true }, ); if (!span) { return original.apply(this, arguments); } const dbContext = { type: 'cassandra' }; if (this.keyspace) { dbContext.instance = this.keyspace; } span.setDbContext(dbContext); try { return await original.apply(this, arguments); } finally { span.end(); } }; } function wrapConnect(original) { return function wrappedConnect(callback) { const span = ins.createSpan( 'Cassandra: Connect', 'db', 'cassandra', 'connect', { exitSpan: true }, ); if (!span) { return original.apply(this, arguments); } const dbContext = { type: 'cassandra' }; if (this.keyspace) { dbContext.instance = this.keyspace; } span.setDbContext(dbContext); function resolve() { span.end(); } // Wrap the callback const ret = original.call(this, wrapCallback(callback)); if (typeof callback !== 'function') { if (typeof ret.then === 'function') { ret.then(resolve, resolve); } else { agent.logger.error( 'unable to identify span exit point for cassandra-driver', ); } } return ret; function wrapCallback(cb) { if (typeof cb !== 'function') return cb; return function wrappedCallback() { resolve(); return cb.apply(this, arguments); }; } }; } function toQueryString(query) { return query.query; } function wrapBatch(original) { return function wrappedBatch(queries, options, callback) { const span = ins.createSpan( 'Cassandra: Batch query', 'db', 'cassandra', 'query', { exitSpan: true }, ); if (!span) { return original.apply(this, arguments); } const queryStrings = queries.map(toQueryString); const query = queryStrings.join(';\n'); const dbContext = { type: 'cassandra', statement: query }; const keyspace = (options && typeof options === 'object' && options.keyspace) || this.keyspace; if (keyspace) { dbContext.instance = keyspace; } span.setDbContext(dbContext); function resolve() { span.end(); } // Wrap the callback const index = arguments.length - 1; const cb = arguments[index]; const isPromise = typeof cb !== 'function'; if (!isPromise) { arguments[index] = function wrappedCallback() { resolve(); return cb.apply(this, arguments); }; } const ret = original.apply(this, arguments); if (isPromise) { if (typeof ret.then === 'function') { ret.then(resolve, resolve); } else { agent.logger.error( 'unable to identify span exit point for cassandra-driver', ); } } return ret; }; } function wrapExecute(original) { return function wrappedExecute(query, params, options, callback) { const span = ins.createSpan(null, 'db', 'cassandra', 'query', { exitSpan: true, }); if (!span) { return original.apply(this, arguments); } span.name = sqlSummary(query); const dbContext = { type: 'cassandra', statement: query }; const keyspace = (options && typeof options === 'object' && options.keyspace) || this.keyspace; if (keyspace) { dbContext.instance = keyspace; } span.setDbContext(dbContext); function resolve() { span.end(); } // Wrap the callback const index = arguments.length - 1; const cb = arguments[index]; const isPromise = typeof cb !== 'function'; if (!isPromise) { arguments[index] = function wrappedCallback() { resolve(); return cb.apply(this, arguments); }; } const ret = original.apply(this, arguments); if (isPromise) { if (typeof ret.then === 'function') { ret.then(resolve, resolve); } else { agent.logger.error( 'unable to identify span exit point for cassandra-driver', ); } } return ret; }; } function wrapEachRow(original) { return function wrappedEachRow( query, params, options, rowCallback, callback, ) { const span = ins.createSpan(null, 'db', 'cassandra', 'query', { exitSpan: true, }); if (!span) { return original.apply(this, arguments); } span.name = sqlSummary(query); const dbContext = { type: 'cassandra', statement: query }; const keyspace = (options && typeof options === 'object' && options.keyspace) || this.keyspace; if (keyspace) { dbContext.instance = keyspace; } span.setDbContext(dbContext); // Wrap the callback const index = arguments.length - 1; const hasRowCallback = typeof arguments[index - 1] === 'function'; function resolve() { span.end(); } if (hasRowCallback) { const cb = arguments[index]; if (typeof cb === 'function') { arguments[index] = function wrappedCallback() { resolve(); return cb.apply(this, arguments); }; } else { agent.logger.error( 'unable to identify span exit point for cassandra-driver', ); } } else { arguments[index + 1] = resolve; arguments.length++; } return original.apply(this, arguments); }; } };