in elasticapm/instrumentation/packages/cassandra.py [0:0]
def call(self, module, method, wrapped, instance, args, kwargs):
name = self.get_wrapped_name(wrapped, instance, method)
context = {}
if method == "Cluster.connect":
span_action = "connect"
if hasattr(instance, "contact_points_resolved"): # < cassandra-driver 3.18
host = instance.contact_points_resolved[0]
port = instance.port
else:
host = instance.endpoints_resolved[0].address
port = instance.endpoints_resolved[0].port
keyspace: Optional[str] = args[0] if args else kwargs.get("keyspace")
if keyspace:
context["db"] = {"instance": keyspace}
else:
hosts = list(instance.hosts)
if hasattr(hosts[0], "endpoint"):
host = hosts[0].endpoint.address
port = hosts[0].endpoint.port
else:
# < cassandra-driver 3.18
host = hosts[0].address
port = instance.cluster.port
db_context = {}
if instance.keyspace:
db_context["instance"] = instance.keyspace
span_action = "query"
query = args[0] if args else kwargs.get("query")
if hasattr(query, "query_string"):
query_str = query.query_string
elif hasattr(query, "prepared_statement") and hasattr(query.prepared_statement, "query"):
query_str = query.prepared_statement.query
elif isinstance(query, str):
query_str = query
else:
query_str = None
if query_str:
name = extract_signature(query_str)
db_context.update({"type": "sql", "statement": query_str})
if db_context:
context["db"] = db_context
context["destination"] = {
"address": host,
"port": port,
}
with capture_span(
name, span_type="db", span_subtype="cassandra", span_action=span_action, extra=context, leaf=True
):
return wrapped(*args, **kwargs)