def _sw_basic_publish_func()

in skywalking/plugins/sw_rabbitmq.py [0:0]


def _sw_basic_publish_func(_basic_publish):
    def _sw_basic_publish(this, exchange,
                          routing_key,
                          body,
                          properties=None,
                          mandatory=False):
        peer = f'{this.connection.params.host}:{this.connection.params.port}'
        context = get_context()
        import pika
        with context.new_exit_span(op=f'RabbitMQ/Topic/{exchange}/Queue/{routing_key}/Producer' or '/',
                                   peer=peer, component=Component.RabbitmqProducer) as span:
            carrier = span.inject()
            span.layer = Layer.MQ
            properties = pika.BasicProperties() if properties is None else properties

            if properties.headers is None:
                properties.headers = {}
            for item in carrier:
                properties.headers[item.key] = item.val

            res = _basic_publish(this, exchange,
                                 routing_key,
                                 body,
                                 properties=properties,
                                 mandatory=mandatory)
            span.tag(TagMqBroker(peer))
            span.tag(TagMqTopic(exchange))
            span.tag(TagMqQueue(routing_key))

            return res

    return _sw_basic_publish