in elasticapm/instrumentation/packages/elasticsearch.py [0:0]
def _update_context_by_request_data(self, context, instance, args, kwargs) -> None:
args_len = len(args)
url = args[1] if args_len > 1 else kwargs.get("url")
params = args[2] if args_len > 2 else kwargs.get("params")
body_serialized = args[3] if args_len > 3 else kwargs.get("body")
if "?" in url and not params:
url, qs = url.split("?", 1)
params = {k: v[0] for k, v in parse_qs(qs).items()}
should_capture_body = bool(should_capture_body_re.search(url))
context["db"] = {"type": "elasticsearch"}
if should_capture_body:
query = []
# using both q AND body is allowed in some API endpoints / ES versions,
# but not in others. We simply capture both if they are there so the
# user can see it.
if params and "q" in params:
# 'q' may already be encoded to a byte string at this point.
# We assume utf8, which is the default
q = params["q"]
if isinstance(q, bytes):
q = q.decode("utf-8", errors="replace")
query.append("q=" + q)
if body_serialized:
if isinstance(body_serialized, bytes):
query.append(body_serialized.decode("utf-8", errors="replace"))
else:
query.append(body_serialized)
if query:
context["db"]["statement"] = "\n\n".join(query)
# ES5: `host` is URL, no `port` attribute
# ES6, ES7: `host` URL, `hostname` is host, `port` is port
# ES8: `host` is hostname, no `hostname` attribute, `port` is `port`
if not hasattr(instance, "port"):
# ES5, parse hostname and port from URL stored in `host`
parsed_url = urlparse(instance.host)
host = parsed_url.hostname
port = parsed_url.port
elif not hasattr(instance, "hostname"):
# ES8 (and up, one can hope)
host = instance.host
port = instance.port
else:
# ES6, ES7
host = instance.hostname
port = instance.port
context["destination"] = {"address": host, "port": port}