def call()

in elasticapm/instrumentation/packages/kafka.py [0:0]


    def call(self, module, method, wrapped, instance, args, kwargs):
        client = get_client()
        if client is None:
            return wrapped(*args, **kwargs)
        destination_info = {
            "service": {"name": "kafka", "resource": "kafka/", "type": "messaging"},
        }

        if method == "KafkaProducer.send":
            topic = args[0] if args else kwargs["topic"]
            if client.should_ignore_topic(topic) or not execution_context.get_transaction():
                return wrapped(*args, **kwargs)
            return self._trace_send(instance, wrapped, destination_info=destination_info, *args, **kwargs)

        elif method == "KafkaConsumer.poll":
            transaction = execution_context.get_transaction()
            if transaction:
                with capture_span(
                    name="Kafka POLL",
                    span_type="messaging",
                    span_subtype=self.provider_name,
                    span_action="poll",
                    leaf=True,
                    extra={
                        "destination": destination_info,
                    },
                ) as span:
                    if span and not isinstance(span, DroppedSpan) and instance._subscription.subscription:
                        span.name += " from " + ", ".join(sorted(instance._subscription.subscription))
                    results = wrapped(*args, **kwargs)
                    return results
            else:
                return wrapped(*args, **kwargs)

        elif method == "KafkaConsumer.__next__":
            transaction = execution_context.get_transaction()
            if transaction and transaction.transaction_type != "messaging":
                # somebody started a transaction outside of the consumer,
                # so we capture it as a span, and record the causal trace as a link
                with capture_span(
                    name="consumer",
                    span_type="messaging",
                    span_subtype=self.provider_name,
                    span_action="receive",
                    leaf=True,
                    extra={
                        "message": {"queue": {"name": ""}},
                        "destination": destination_info,
                    },
                ) as span:
                    try:
                        result = wrapped(*args, **kwargs)
                    except StopIteration:
                        if span:
                            span.cancel()
                        raise
                    if span and not isinstance(span, DroppedSpan):
                        topic = result[0]
                        if client.should_ignore_topic(topic):
                            span.cancel()
                            return result
                        trace_parent = self.get_traceparent_from_result(result)
                        if trace_parent:
                            span.add_link(trace_parent)
                        destination_info["service"]["resource"] += topic
                        span.context["message"]["queue"]["name"] = topic
                        span.name = "Kafka RECEIVE from " + topic
                    return result
            else:
                # No transaction running, or this is a transaction started by us,
                # so let's end it and start the next,
                # unless a StopIteration is raised, at which point we do nothing.
                if transaction:
                    client.end_transaction()
                result = wrapped(*args, **kwargs)
                topic = result[0]
                if client.should_ignore_topic(topic):
                    return result
                trace_parent = self.get_traceparent_from_result(result)
                transaction = client.begin_transaction("messaging", trace_parent=trace_parent)
                if result.timestamp_type == 0:
                    current_time_millis = int(round(time.time() * 1000))
                    age = current_time_millis - result.timestamp
                    transaction.context = {
                        "message": {"age": {"ms": age}, "queue": {"name": topic}},
                        "service": {"framework": {"name": "Kafka"}},
                    }
                transaction_name = "Kafka RECEIVE from " + topic
                elasticapm.set_transaction_name(transaction_name, override=True)
                res = constants.OUTCOME.SUCCESS
                elasticapm.set_transaction_result(res, override=False)
                return result