in skywalking/plugins/sw_rabbitmq.py [0:0]
def _sw_basic_publish_func(_basic_publish):
def _sw_basic_publish(this, exchange,
routing_key,
body,
properties=None,
mandatory=False):
peer = f'{this.connection.params.host}:{this.connection.params.port}'
context = get_context()
import pika
with context.new_exit_span(op=f'RabbitMQ/Topic/{exchange}/Queue/{routing_key}/Producer' or '/',
peer=peer, component=Component.RabbitmqProducer) as span:
carrier = span.inject()
span.layer = Layer.MQ
properties = pika.BasicProperties() if properties is None else properties
if properties.headers is None:
properties.headers = {}
for item in carrier:
properties.headers[item.key] = item.val
res = _basic_publish(this, exchange,
routing_key,
body,
properties=properties,
mandatory=mandatory)
span.tag(TagMqBroker(peer))
span.tag(TagMqTopic(exchange))
span.tag(TagMqQueue(routing_key))
return res
return _sw_basic_publish