def _sw__on_deliver_func()

in skywalking/plugins/sw_rabbitmq.py [0:0]


def _sw__on_deliver_func(__on_deliver):
    from pika.adapters.blocking_connection import BlockingChannel

    def _sw__on_deliver(this, method_frame, header_frame, body):
        peer = f'{this.connection.params.host}:{this.connection.params.port}'
        consumer_tag = method_frame.method.consumer_tag

        # The following is a special case for one type of channel to allow any exit spans to be linked properly to the
        # incoming segment. Otherwise, if we create the span here the span ends before any oser callbacks are called and
        # so any new spans will not be linked to the incoming message.

        defer_span = False

        try:  # future-proofing if object structure changes
            if consumer_tag not in this._cancelled and consumer_tag in this._consumers:
                consumer = this._consumers[consumer_tag]

                if isinstance(consumer.__self__, BlockingChannel):
                    method_frame.method._sw_peer = peer
                    defer_span = True

        except Exception:
            pass

        if defer_span:
            return __on_deliver(this, method_frame, header_frame, body)

        context = get_context()
        exchange = method_frame.method.exchange
        routing_key = method_frame.method.routing_key
        carrier = Carrier()
        for item in carrier:
            try:
                if item.key in header_frame.properties.headers:
                    item.val = header_frame.properties.headers[item.key]
            except TypeError:
                pass

        with context.new_entry_span(op='RabbitMQ/Topic/' + exchange + '/Queue/' + routing_key
                                       + '/Consumer' or '', carrier=carrier) as span:
            span.layer = Layer.MQ
            span.component = Component.RabbitmqConsumer
            __on_deliver(this, method_frame, header_frame, body)
            span.tag(TagMqBroker(peer))
            span.tag(TagMqTopic(exchange))
            span.tag(TagMqQueue(routing_key))

    return _sw__on_deliver