def install()

in skywalking/plugins/sw_pulsar.py [0:0]


def install():
    from pulsar import Producer
    from pulsar import Consumer
    from pulsar import Client

    __init = Client.__init__
    _send = Producer.send
    _receive = Consumer.receive
    _peer = ''

    def get_peer():
        return _peer

    def set_peer(value):
        nonlocal _peer
        _peer = value

    def _sw_init(self, service_url, *args, **kwargs):
        __init(self, service_url, *args, **kwargs)
        set_peer(service_url)

    def _sw_send_func(_send):
        def _sw_send(this, content,
                     properties=None,
                     partition_key=None,
                     sequence_id=None,
                     replication_clusters=None,
                     disable_replication=False,
                     event_timestamp=None,
                     deliver_at=None,
                     deliver_after=None,
                     ):
            topic = this._producer.topic().split('/')[-1]
            with get_context().new_exit_span(op=f'Pulsar/Topic/{topic}/Producer', peer=get_peer(),
                                             component=Component.PulsarProducer) as span:
                span.tag(TagMqTopic(topic))
                span.tag(TagMqBroker(get_peer()))
                span.layer = Layer.MQ

                carrier = span.inject()
                if properties is None:
                    properties = {}
                for item in carrier:
                    properties[item.key] = item.val

                return _send(this, content, properties=properties, partition_key=partition_key,
                             sequence_id=sequence_id, replication_clusters=replication_clusters,
                             disable_replication=disable_replication, event_timestamp=event_timestamp,
                             deliver_at=deliver_at, deliver_after=deliver_after)

        return _sw_send

    def _sw_receive_func(_receive):
        def _sw_receive(this, timeout_millis=None):
            res = _receive(this, timeout_millis=timeout_millis)
            if res:
                topic = res.topic_name().split('/')[-1]
                properties = res.properties()
                carrier = Carrier()
                for item in carrier:
                    if item.key in properties.keys():
                        val = res.properties().get(item.key)
                        if val is not None:
                            item.val = val

                with get_context().new_entry_span(op=f'Pulsar/Topic/{topic}/Consumer', carrier=carrier) as span:
                    span.tag(TagMqTopic(topic))
                    span.tag(TagMqBroker(get_peer()))
                    span.layer = Layer.MQ
                    span.component = Component.PulsarConsumer
            return res

        return _sw_receive

    Client.__init__ = _sw_init
    Producer.send = _sw_send_func(_send)
    Consumer.receive = _sw_receive_func(_receive)