in lib/instrumentation/modules/kafkajs.js [99:184]
function wrapEachMessage(origEachMessage) {
return async function (payload) {
const { topic, message } = payload;
if (shouldIgnoreTopic(topic, config)) {
return origEachMessage.apply(this, arguments);
}
// For distributed tracing this instrumentation is going to check
// the headers defined by opentelemetry and ignore the propietary
// `elasticaapmtraceparent` header
// https://github.com/elastic/apm/blob/main/specs/agents/tracing-distributed-tracing.md#binary-fields
const traceparent = message.headers && message.headers.traceparent;
const tracestate = message.headers && message.headers.tracestate;
const opts = {};
// According to `kafkajs` types a header value might be
// a string or Buffer
// https://github.com/tulios/kafkajs/blob/ff3b1117f316d527ae170b550bc0f772614338e9/types/index.d.ts#L148
if (typeof traceparent === 'string') {
opts.childOf = traceparent;
} else if (traceparent instanceof Buffer) {
opts.childOf = traceparent.toString('utf-8');
}
if (typeof tracestate === 'string') {
opts.tracestate = tracestate;
} else if (tracestate instanceof Buffer) {
opts.tracestate = tracestate.toString('utf-8');
}
const trans = ins.startTransaction(
`${NAME} RECEIVE from ${topic}`,
TYPE,
opts,
);
const messageCtx = { queue: { name: topic } };
if (
config.captureBody === 'all' ||
config.captureBody === 'transactions'
) {
messageCtx.body = message.value?.toString();
}
if (message.headers && config.captureHeaders) {
// Make sure there is no sensitive data
// and transform non-redacted buffers
messageCtx.headers = redactKeysFromObject(
message.headers,
config.sanitizeFieldNamesRegExp,
);
Object.keys(messageCtx.headers).forEach((key) => {
const value = messageCtx.headers[key];
if (value instanceof Buffer) {
messageCtx.headers[key] = value.toString('utf-8');
}
});
}
if (message.timestamp) {
messageCtx.age = {
ms: Date.now() - Number(message.timestamp),
};
}
trans.setMessageContext(messageCtx);
let result, err;
try {
result = await origEachMessage.apply(this, arguments);
} catch (ex) {
// Save the error for use in `finally` below, but re-throw it to
// not impact code flow.
err = ex;
throw ex;
} finally {
trans.setOutcome(
err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS,
);
trans.end();
}
return result;
};
}