in skywalking/plugins/sw_rabbitmq.py [0:0]
def _sw_callback_func(callback):
def _sw_callback(this, method, properties, body):
peer = getattr(method, '_sw_peer', None)
if peer is None:
params = getattr(getattr(this.connection, '_impl', None), 'params', None)
peer = '<unavailable>' if params is None else f'{params.host}:{params.port}'
context = get_context()
exchange = method.exchange
routing_key = method.routing_key
carrier = Carrier()
for item in carrier:
try:
if item.key in properties.headers:
item.val = properties.headers[item.key]
except TypeError:
pass
with context.new_entry_span(op='RabbitMQ/Topic/' + exchange + '/Queue/' + routing_key
+ '/Consumer' or '', carrier=carrier) as span:
span.layer = Layer.MQ
span.component = Component.RabbitmqConsumer
res = callback(this, method, properties, body)
span.tag(TagMqBroker(peer))
span.tag(TagMqTopic(exchange))
span.tag(TagMqQueue(routing_key))
return res
return _sw_callback