lib/instrumentation/modules/kafkajs.js (301 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and other contributors where applicable.
* Licensed under the BSD 2-Clause License; you may not use this file except in
* compliance with the BSD 2-Clause License.
*/
'use strict';
const { Buffer } = require('buffer');
const semver = require('semver');
const constants = require('../../constants');
const shimmer = require('../shimmer');
const { redactKeysFromObject } = require('../../filters/sanitize-field-names');
const NAME = 'Kafka';
const TYPE = 'messaging';
const SUBTYPE = 'kafka';
/**
* @typedef {{ Kafka: import('kafkajs').Kafka}} KafkaModule
* @typedef {(config: any) => Consumer} ConsumerFactory
* @typedef {import('kafkajs').Consumer} Consumer
* @typedef {import('kafkajs').ConsumerRunConfig} ConsumerRunConfig
* @typedef {(config: any) => Producer} ProducerFactory
* @typedef {import('kafkajs').Producer} Producer
* @typedef {import('kafkajs').ProducerRecord} ProducerRecord
*/
/**
* @param {KafkaModule} mod
* @param {any} agent
* @param {Object} options
* @param {string} options.version
* @param {boolean} options.enabled
*/
module.exports = function (mod, agent, { version, enabled }) {
if (!enabled || !semver.satisfies(version, '>=2 <3')) {
return mod;
}
const config = agent._conf;
const ins = agent._instrumentation;
agent.logger.debug('shimming Kafka.prototype.consumer');
shimmer.wrap(mod.Kafka.prototype, 'consumer', wrapConsumer);
agent.logger.debug('shimming Kafka.prototype.producer');
shimmer.wrap(mod.Kafka.prototype, 'producer', wrapProducer);
return mod;
/**
* Returns the patched version of `Kafka.consumer` which creates a new
* consumer with its `run` method patched to instrument message handling
*
* @param {ConsumerFactory} origConsumer
* @returns {ConsumerFactory}
*/
function wrapConsumer(origConsumer) {
return function wrappedConsumer() {
const consumer = origConsumer.apply(this, arguments);
shimmer.wrap(consumer, 'run', wrapConsumerRun);
return consumer;
};
}
/**
* Return the patched version of `run` which instruments the
* `eachMessage` & `eachBatch` callbacks.
*
* @param {Consumer['run']} origRun
* @returns {Consumer['run']}
*/
function wrapConsumerRun(origRun) {
return function wrappedConsumerRun() {
const runConfig = arguments[0];
if (typeof runConfig.eachMessage === 'function') {
shimmer.wrap(runConfig, 'eachMessage', wrapEachMessage);
}
if (typeof runConfig.eachBatch === 'function') {
shimmer.wrap(runConfig, 'eachBatch', wrapEachBatch);
}
return origRun.apply(this, arguments);
};
}
/**
* Returns the instrumented version of `eachMessage` which
* - creates a transaction each time is called
* - add trace context into the transaction if present in message headers
*
* @param {ConsumerRunConfig['eachMessage']} origEachMessage
* @returns {ConsumerRunConfig['eachMessage']}
*/
function wrapEachMessage(origEachMessage) {
return async function (payload) {
const { topic, message } = payload;
if (shouldIgnoreTopic(topic, config)) {
return origEachMessage.apply(this, arguments);
}
// For distributed tracing this instrumentation is going to check
// the headers defined by opentelemetry and ignore the propietary
// `elasticaapmtraceparent` header
// https://github.com/elastic/apm/blob/main/specs/agents/tracing-distributed-tracing.md#binary-fields
const traceparent = message.headers && message.headers.traceparent;
const tracestate = message.headers && message.headers.tracestate;
const opts = {};
// According to `kafkajs` types a header value might be
// a string or Buffer
// https://github.com/tulios/kafkajs/blob/ff3b1117f316d527ae170b550bc0f772614338e9/types/index.d.ts#L148
if (typeof traceparent === 'string') {
opts.childOf = traceparent;
} else if (traceparent instanceof Buffer) {
opts.childOf = traceparent.toString('utf-8');
}
if (typeof tracestate === 'string') {
opts.tracestate = tracestate;
} else if (tracestate instanceof Buffer) {
opts.tracestate = tracestate.toString('utf-8');
}
const trans = ins.startTransaction(
`${NAME} RECEIVE from ${topic}`,
TYPE,
opts,
);
const messageCtx = { queue: { name: topic } };
if (
config.captureBody === 'all' ||
config.captureBody === 'transactions'
) {
messageCtx.body = message.value?.toString();
}
if (message.headers && config.captureHeaders) {
// Make sure there is no sensitive data
// and transform non-redacted buffers
messageCtx.headers = redactKeysFromObject(
message.headers,
config.sanitizeFieldNamesRegExp,
);
Object.keys(messageCtx.headers).forEach((key) => {
const value = messageCtx.headers[key];
if (value instanceof Buffer) {
messageCtx.headers[key] = value.toString('utf-8');
}
});
}
if (message.timestamp) {
messageCtx.age = {
ms: Date.now() - Number(message.timestamp),
};
}
trans.setMessageContext(messageCtx);
let result, err;
try {
result = await origEachMessage.apply(this, arguments);
} catch (ex) {
// Save the error for use in `finally` below, but re-throw it to
// not impact code flow.
err = ex;
throw ex;
} finally {
trans.setOutcome(
err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS,
);
trans.end();
}
return result;
};
}
/**
* Returns the instrumented version of `eachBatch` which
* - creates a transaction each time is called
* - if trace context present in messages inks them to the transaction
*
* @param {ConsumerRunConfig['eachBatch']} origEachBatch
* @returns {ConsumerRunConfig['eachBatch']}
*/
function wrapEachBatch(origEachBatch) {
return async function ({ batch }) {
if (shouldIgnoreTopic(batch.topic, config)) {
return origEachBatch.apply(this, arguments);
}
const trans = ins.startTransaction(
`${NAME} RECEIVE from ${batch.topic}`,
TYPE,
);
const messageCtx = { queue: { name: batch.topic } };
trans.setMessageContext(messageCtx);
const serviceContext = {
framework: { name: 'Kafka' },
};
trans.setServiceContext(serviceContext);
// Extract span links from up to 1000 messages in this batch.
// https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-messaging.md#receiving-trace-context
// A span link is created from a `traceparent` header in a message.
const messages = batch && batch.messages;
if (messages) {
const traceparentsSeen = new Set();
const links = [];
const limit = Math.min(
messages.length,
constants.MAX_MESSAGES_PROCESSED_FOR_TRACE_CONTEXT,
);
for (let i = 0; i < messages.length; i++) {
const msg = messages[i];
const traceparent =
msg.headers &&
msg.headers.traceparent &&
msg.headers.traceparent.toString();
if (traceparent && !traceparentsSeen.has(traceparent)) {
links.push({ context: traceparent });
traceparentsSeen.add(traceparent);
if (links.length >= limit) {
break;
}
}
}
trans.addLinks(links);
}
let result, err;
try {
result = await origEachBatch.apply(this, arguments);
} catch (ex) {
// Save the error for use in `finally` below, but re-throw it to
// not impact code flow.
err = ex;
throw ex;
} finally {
trans.setOutcome(
err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS,
);
trans.end();
}
return result;
};
}
/**
* Returns the patched version of `Kafka.producer` which creates a new
* producer with `send` & `sendBatch` methods patched to instrument message sending
*
* @param {ProducerFactory} origProducer
* @returns {ProducerFactory}
*/
function wrapProducer(origProducer) {
return function wrappedProducer() {
const producer = origProducer.apply(this, arguments);
shimmer.wrap(producer, 'send', wrapProducerSend);
shimmer.wrap(producer, 'sendBatch', wrapProducerSendBatch);
return producer;
};
}
/**
* Returns the instrumented version of `send` which
* - creates an exit span each time is called
* - propagates trace context through message headers
*
* @param {Producer['send']} origSend
* @returns {Producer['send']}
*/
function wrapProducerSend(origSend) {
return async function (record) {
const { topic } = record;
let span;
if (!shouldIgnoreTopic(topic, config)) {
span = ins.createSpan(
`${NAME} SEND to ${topic}`,
TYPE,
SUBTYPE,
'send',
{ exitSpan: true },
);
}
// W3C trace-context propagation.
const runContext = ins.currRunContext();
const parentSpan =
span || runContext.currSpan() || runContext.currTransaction();
if (parentSpan) {
record.messages.forEach((msg) => {
const newHeaders = Object.assign({}, msg.headers);
parentSpan.propagateTraceContextHeaders(
newHeaders,
function (carrier, name, value) {
if (name.startsWith('elastic-')) {
return;
}
carrier[name] = value;
},
);
msg.headers = newHeaders;
});
}
if (!span) {
return origSend.apply(this, arguments);
}
// We do not add headers or body because:
// - `record.messages` is a list
// - spec says is for transactions (https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-messaging.md#transaction-context-fields)
span.setMessageContext({ queue: { name: topic } });
const service = {
resource: `${SUBTYPE}/${topic}`,
type: SUBTYPE,
name: topic,
};
span._setDestinationContext({ service });
let result, err;
try {
result = await origSend.apply(this, arguments);
} catch (ex) {
// Save the error for use in `finally` below, but re-throw it to
// not impact code flow.
err = ex;
throw ex;
} finally {
span.setOutcome(
err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS,
);
span.end();
}
return result;
};
}
/**
* Returns the patched version of `Producer.sendBatch` which
* - creates an exit span for the operation
* - propagates trace context via message headers
*
* @param {Producer['sendBatch']} origSendBatch
* @returns {Producer['sendBatch']}
*/
function wrapProducerSendBatch(origSendBatch) {
return async function (batch) {
let span;
let topicForContext;
let shouldIgnoreBatch = true;
const messages = batch.topicMessages || [];
const topics = new Set();
// Remove possible topic duplications
for (const msg of messages) {
topics.add(msg.topic);
}
for (const t of topics) {
const topicIgnored = shouldIgnoreTopic(t, config);
shouldIgnoreBatch = shouldIgnoreBatch && topicIgnored;
// When a topic is not ignored we keep a copy for context unless
// we find a 2nd topic also not ignored.
if (!topicIgnored) {
if (topicForContext) {
topicForContext = undefined;
break;
}
topicForContext = t;
}
}
if (!shouldIgnoreBatch) {
const suffix = topicForContext ? ` to ${topicForContext}` : '';
span = ins.createSpan(`${NAME} SEND${suffix}`, TYPE, SUBTYPE, 'send', {
exitSpan: true,
});
}
// W3C trace-context propagation.
const runContext = ins.currRunContext();
const parentSpan =
span || runContext.currSpan() || runContext.currTransaction();
if (parentSpan && batch.topicMessages) {
batch.topicMessages.forEach((topicMessage) => {
topicMessage.messages.forEach((msg) => {
const newHeaders = Object.assign({}, msg.headers);
parentSpan.propagateTraceContextHeaders(
newHeaders,
function (carrier, name, value) {
if (name.startsWith('elastic-')) {
return;
}
carrier[name] = value;
},
);
msg.headers = newHeaders;
});
});
}
if (!span) {
return origSendBatch.apply(this, arguments);
}
if (topicForContext) {
// We do not add headers or body because:
// - `record.messages` is a list
// - spec says is for transactions (https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-messaging.md#transaction-context-fields)
span.setMessageContext({ queue: { name: topicForContext } });
}
span.setServiceTarget(SUBTYPE, topicForContext);
let result, err;
try {
result = await origSendBatch.apply(this, arguments);
} catch (ex) {
// Save the error for use in `finally` below, but re-throw it to
// not impact code flow.
err = ex;
throw ex;
} finally {
span.setOutcome(
err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS,
);
span.end();
}
return result;
};
}
};
/**
* Returns true if we have to ignore messages on the given topic
*
* @param {string} topic the topic where client is publishing/subscribing
* @param {{ ignoreMessageQueuesRegExp: RegExp[] }} config the agent's configuration object
* @returns {boolean}
*/
function shouldIgnoreTopic(topic, config) {
if (config.ignoreMessageQueuesRegExp) {
for (const rule of config.ignoreMessageQueuesRegExp) {
if (rule.test(topic)) {
return true;
}
}
}
return false;
}