in elasticapm/instrumentation/packages/kafka.py [0:0]
def call(self, module, method, wrapped, instance, args, kwargs):
client = get_client()
if client is None:
return wrapped(*args, **kwargs)
destination_info = {
"service": {"name": "kafka", "resource": "kafka/", "type": "messaging"},
}
if method == "KafkaProducer.send":
topic = args[0] if args else kwargs["topic"]
if client.should_ignore_topic(topic) or not execution_context.get_transaction():
return wrapped(*args, **kwargs)
return self._trace_send(instance, wrapped, destination_info=destination_info, *args, **kwargs)
elif method == "KafkaConsumer.poll":
transaction = execution_context.get_transaction()
if transaction:
with capture_span(
name="Kafka POLL",
span_type="messaging",
span_subtype=self.provider_name,
span_action="poll",
leaf=True,
extra={
"destination": destination_info,
},
) as span:
if span and not isinstance(span, DroppedSpan) and instance._subscription.subscription:
span.name += " from " + ", ".join(sorted(instance._subscription.subscription))
results = wrapped(*args, **kwargs)
return results
else:
return wrapped(*args, **kwargs)
elif method == "KafkaConsumer.__next__":
transaction = execution_context.get_transaction()
if transaction and transaction.transaction_type != "messaging":
# somebody started a transaction outside of the consumer,
# so we capture it as a span, and record the causal trace as a link
with capture_span(
name="consumer",
span_type="messaging",
span_subtype=self.provider_name,
span_action="receive",
leaf=True,
extra={
"message": {"queue": {"name": ""}},
"destination": destination_info,
},
) as span:
try:
result = wrapped(*args, **kwargs)
except StopIteration:
if span:
span.cancel()
raise
if span and not isinstance(span, DroppedSpan):
topic = result[0]
if client.should_ignore_topic(topic):
span.cancel()
return result
trace_parent = self.get_traceparent_from_result(result)
if trace_parent:
span.add_link(trace_parent)
destination_info["service"]["resource"] += topic
span.context["message"]["queue"]["name"] = topic
span.name = "Kafka RECEIVE from " + topic
return result
else:
# No transaction running, or this is a transaction started by us,
# so let's end it and start the next,
# unless a StopIteration is raised, at which point we do nothing.
if transaction:
client.end_transaction()
result = wrapped(*args, **kwargs)
topic = result[0]
if client.should_ignore_topic(topic):
return result
trace_parent = self.get_traceparent_from_result(result)
transaction = client.begin_transaction("messaging", trace_parent=trace_parent)
if result.timestamp_type == 0:
current_time_millis = int(round(time.time() * 1000))
age = current_time_millis - result.timestamp
transaction.context = {
"message": {"age": {"ms": age}, "queue": {"name": topic}},
"service": {"framework": {"name": "Kafka"}},
}
transaction_name = "Kafka RECEIVE from " + topic
elasticapm.set_transaction_name(transaction_name, override=True)
res = constants.OUTCOME.SUCCESS
elasticapm.set_transaction_result(res, override=False)
return result