in lib/instrumentation/modules/kafkajs.js [368:456]
function wrapProducerSendBatch(origSendBatch) {
return async function (batch) {
let span;
let topicForContext;
let shouldIgnoreBatch = true;
const messages = batch.topicMessages || [];
const topics = new Set();
// Remove possible topic duplications
for (const msg of messages) {
topics.add(msg.topic);
}
for (const t of topics) {
const topicIgnored = shouldIgnoreTopic(t, config);
shouldIgnoreBatch = shouldIgnoreBatch && topicIgnored;
// When a topic is not ignored we keep a copy for context unless
// we find a 2nd topic also not ignored.
if (!topicIgnored) {
if (topicForContext) {
topicForContext = undefined;
break;
}
topicForContext = t;
}
}
if (!shouldIgnoreBatch) {
const suffix = topicForContext ? ` to ${topicForContext}` : '';
span = ins.createSpan(`${NAME} SEND${suffix}`, TYPE, SUBTYPE, 'send', {
exitSpan: true,
});
}
// W3C trace-context propagation.
const runContext = ins.currRunContext();
const parentSpan =
span || runContext.currSpan() || runContext.currTransaction();
if (parentSpan && batch.topicMessages) {
batch.topicMessages.forEach((topicMessage) => {
topicMessage.messages.forEach((msg) => {
const newHeaders = Object.assign({}, msg.headers);
parentSpan.propagateTraceContextHeaders(
newHeaders,
function (carrier, name, value) {
if (name.startsWith('elastic-')) {
return;
}
carrier[name] = value;
},
);
msg.headers = newHeaders;
});
});
}
if (!span) {
return origSendBatch.apply(this, arguments);
}
if (topicForContext) {
// We do not add headers or body because:
// - `record.messages` is a list
// - spec says is for transactions (https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-messaging.md#transaction-context-fields)
span.setMessageContext({ queue: { name: topicForContext } });
}
span.setServiceTarget(SUBTYPE, topicForContext);
let result, err;
try {
result = await origSendBatch.apply(this, arguments);
} catch (ex) {
// Save the error for use in `finally` below, but re-throw it to
// not impact code flow.
err = ex;
throw ex;
} finally {
span.setOutcome(
err ? constants.OUTCOME_FAILURE : constants.OUTCOME_SUCCESS,
);
span.end();
}
return result;
};
}