def _sw_callback_func()

in skywalking/plugins/sw_rabbitmq.py [0:0]


def _sw_callback_func(callback):
    def _sw_callback(this, method, properties, body):
        peer = getattr(method, '_sw_peer', None)

        if peer is None:
            params = getattr(getattr(this.connection, '_impl', None), 'params', None)
            peer = '<unavailable>' if params is None else f'{params.host}:{params.port}'

        context = get_context()
        exchange = method.exchange
        routing_key = method.routing_key
        carrier = Carrier()
        for item in carrier:
            try:
                if item.key in properties.headers:
                    item.val = properties.headers[item.key]
            except TypeError:
                pass

        with context.new_entry_span(op='RabbitMQ/Topic/' + exchange + '/Queue/' + routing_key
                                    + '/Consumer' or '', carrier=carrier) as span:
            span.layer = Layer.MQ
            span.component = Component.RabbitmqConsumer
            res = callback(this, method, properties, body)
            span.tag(TagMqBroker(peer))
            span.tag(TagMqTopic(exchange))
            span.tag(TagMqQueue(routing_key))

        return res

    return _sw_callback