def _sw_send_func()

in skywalking/plugins/sw_kafka.py [0:0]


def _sw_send_func(_send):
    def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
        # ignore trace, log and meter reporter - skywalking self request
        if config.agent_protocol == 'kafka' and \
                (config.kafka_topic_segment == topic
                 or config.kafka_topic_log == topic
                 or config.kafka_topic_management == topic
                 or config.kafka_topic_meter == topic):
            return _send(this, topic, value=value, key=key, headers=headers, partition=partition,
                         timestamp_ms=timestamp_ms)

        peer = ';'.join(this.config['bootstrap_servers'])
        context = get_context()
        with context.new_exit_span(op=f'Kafka/{topic}/Producer' or '/', peer=peer,
                                   component=Component.KafkaProducer) as span:
            carrier = span.inject()
            span.layer = Layer.MQ

            if headers is None:
                headers = []
            for item in carrier:
                headers.append((item.key, item.val.encode('utf-8')))

            res = _send(this, topic, value=value, key=key, headers=headers, partition=partition,
                        timestamp_ms=timestamp_ms)
            span.tag(TagMqBroker(peer))
            span.tag(TagMqTopic(topic))

            return res

    return _sw_send