in skywalking/plugins/sw_kafka.py [0:0]
def _sw_send_func(_send):
def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
# ignore trace, log and meter reporter - skywalking self request
if config.agent_protocol == 'kafka' and \
(config.kafka_topic_segment == topic
or config.kafka_topic_log == topic
or config.kafka_topic_management == topic
or config.kafka_topic_meter == topic):
return _send(this, topic, value=value, key=key, headers=headers, partition=partition,
timestamp_ms=timestamp_ms)
peer = ';'.join(this.config['bootstrap_servers'])
context = get_context()
with context.new_exit_span(op=f'Kafka/{topic}/Producer' or '/', peer=peer,
component=Component.KafkaProducer) as span:
carrier = span.inject()
span.layer = Layer.MQ
if headers is None:
headers = []
for item in carrier:
headers.append((item.key, item.val.encode('utf-8')))
res = _send(this, topic, value=value, key=key, headers=headers, partition=partition,
timestamp_ms=timestamp_ms)
span.tag(TagMqBroker(peer))
span.tag(TagMqTopic(topic))
return res
return _sw_send