in src/plugins/AMQPLibPlugin.ts [85:119]
BaseChannel.prototype.dispatchMessage = function (fields: any, message: any) {
const topic = message?.fields?.exchange || '';
const queue = message?.fields?.routingKey || '';
const carrier = ContextCarrier.from(message?.properties?.headers || {});
const span = ContextManager.current.newEntrySpan('RabbitMQ/' + topic + '/' + queue + '/Consumer', carrier);
span.start();
try {
span.component = Component.RABBITMQ_CONSUMER;
span.layer = SpanLayer.MQ;
span.peer = `${this.connection.stream.remoteAddress}:${this.connection.stream.remotePort}`;
span.tag(
Tag.mqBroker((this.connection.stream.constructor.name === 'Socket' ? 'amqp://' : 'amqps://') + span.peer),
);
if (topic) span.tag(Tag.mqTopic(topic));
if (queue) span.tag(Tag.mqQueue(queue));
if (message === null) span.log('Cancel', true);
const ret = _dispatchMessage.call(this, fields, message);
span.stop();
return ret;
} catch (e) {
span.error(e);
span.stop();
throw e;
}
};