def _trace_send()

in elasticapm/instrumentation/packages/kafka.py [0:0]


    def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs):
        topic = args[0] if args else kwargs["topic"]
        headers = args[4] if len(args) > 4 else kwargs.get("headers", None)

        span_name = f"Kafka SEND to {topic}"
        destination_info["service"]["resource"] += topic
        with capture_span(
            name=span_name,
            span_type="messaging",
            span_subtype=self.provider_name,
            span_action="send",
            leaf=True,
            extra={
                "message": {"queue": {"name": topic}},
                "destination": destination_info,
            },
        ) as span:
            transaction = execution_context.get_transaction()
            if transaction:
                tp = transaction.trace_parent.copy_from(
                    span_id=span.id if span else transaction.id,
                    trace_options=None if span else TracingOptions(recorded=False),
                )
            if headers:
                headers.append((constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary()))
            else:
                headers = [(constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary())]
                if len(args) > 4:
                    args = list(args)
                    args[4] = headers
                else:
                    kwargs["headers"] = headers
            result = wrapped(*args, **kwargs)
            if span and instance and instance._metadata.controller and not isinstance(span, DroppedSpan):
                address = instance._metadata.controller[1]
                port = instance._metadata.controller[2]
                span.context["destination"]["address"] = address
                span.context["destination"]["port"] = port
            return result