in skywalking/plugins/sw_rabbitmq.py [0:0]
def _sw__on_deliver_func(__on_deliver):
from pika.adapters.blocking_connection import BlockingChannel
def _sw__on_deliver(this, method_frame, header_frame, body):
peer = f'{this.connection.params.host}:{this.connection.params.port}'
consumer_tag = method_frame.method.consumer_tag
# The following is a special case for one type of channel to allow any exit spans to be linked properly to the
# incoming segment. Otherwise, if we create the span here the span ends before any oser callbacks are called and
# so any new spans will not be linked to the incoming message.
defer_span = False
try: # future-proofing if object structure changes
if consumer_tag not in this._cancelled and consumer_tag in this._consumers:
consumer = this._consumers[consumer_tag]
if isinstance(consumer.__self__, BlockingChannel):
method_frame.method._sw_peer = peer
defer_span = True
except Exception:
pass
if defer_span:
return __on_deliver(this, method_frame, header_frame, body)
context = get_context()
exchange = method_frame.method.exchange
routing_key = method_frame.method.routing_key
carrier = Carrier()
for item in carrier:
try:
if item.key in header_frame.properties.headers:
item.val = header_frame.properties.headers[item.key]
except TypeError:
pass
with context.new_entry_span(op='RabbitMQ/Topic/' + exchange + '/Queue/' + routing_key
+ '/Consumer' or '', carrier=carrier) as span:
span.layer = Layer.MQ
span.component = Component.RabbitmqConsumer
__on_deliver(this, method_frame, header_frame, body)
span.tag(TagMqBroker(peer))
span.tag(TagMqTopic(exchange))
span.tag(TagMqQueue(routing_key))
return _sw__on_deliver