BaseChannel.prototype.dispatchMessage = function()

in src/plugins/AMQPLibPlugin.ts [85:119]


    BaseChannel.prototype.dispatchMessage = function (fields: any, message: any) {
      const topic = message?.fields?.exchange || '';
      const queue = message?.fields?.routingKey || '';
      const carrier = ContextCarrier.from(message?.properties?.headers || {});
      const span = ContextManager.current.newEntrySpan('RabbitMQ/' + topic + '/' + queue + '/Consumer', carrier);

      span.start();

      try {
        span.component = Component.RABBITMQ_CONSUMER;
        span.layer = SpanLayer.MQ;
        span.peer = `${this.connection.stream.remoteAddress}:${this.connection.stream.remotePort}`;

        span.tag(
          Tag.mqBroker((this.connection.stream.constructor.name === 'Socket' ? 'amqp://' : 'amqps://') + span.peer),
        );

        if (topic) span.tag(Tag.mqTopic(topic));

        if (queue) span.tag(Tag.mqQueue(queue));

        if (message === null) span.log('Cancel', true);

        const ret = _dispatchMessage.call(this, fields, message);

        span.stop();

        return ret;
      } catch (e) {
        span.error(e);
        span.stop();

        throw e;
      }
    };