in elasticapm/instrumentation/packages/kafka.py [0:0]
def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs):
topic = args[0] if args else kwargs["topic"]
headers = args[4] if len(args) > 4 else kwargs.get("headers", None)
span_name = f"Kafka SEND to {topic}"
destination_info["service"]["resource"] += topic
with capture_span(
name=span_name,
span_type="messaging",
span_subtype=self.provider_name,
span_action="send",
leaf=True,
extra={
"message": {"queue": {"name": topic}},
"destination": destination_info,
},
) as span:
transaction = execution_context.get_transaction()
if transaction:
tp = transaction.trace_parent.copy_from(
span_id=span.id if span else transaction.id,
trace_options=None if span else TracingOptions(recorded=False),
)
if headers:
headers.append((constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary()))
else:
headers = [(constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary())]
if len(args) > 4:
args = list(args)
args[4] = headers
else:
kwargs["headers"] = headers
result = wrapped(*args, **kwargs)
if span and instance and instance._metadata.controller and not isinstance(span, DroppedSpan):
address = instance._metadata.controller[1]
port = instance._metadata.controller[2]
span.context["destination"]["address"] = address
span.context["destination"]["port"] = port
return result