in skywalking/plugins/sw_kafka.py [0:0]
def _sw__poll_once_func(__poll_once):
def _sw__poll_once(this, timeout_ms, max_records, update_offsets=True):
res = __poll_once(this, timeout_ms, max_records, update_offsets=update_offsets)
if res:
brokers = ';'.join(this.config['bootstrap_servers'])
context = get_context()
topics = ';'.join(this._subscription.subscription
or [t.topic for t in this._subscription._user_assignment])
with context.new_entry_span(
op=f"Kafka/{topics}/Consumer/{this.config['group_id'] or ''}") as span:
for consumer_records in res.values():
for record in consumer_records:
carrier = Carrier()
for item in carrier:
for header in record.headers:
if item.key == header[0]:
item.val = str(header[1])
span.extract(carrier)
span.tag(TagMqBroker(brokers))
span.tag(TagMqTopic(topics))
span.layer = Layer.MQ
span.component = Component.KafkaConsumer
return res
return _sw__poll_once