elasticapm/contrib/asgi.py (145 lines of code) (raw):
# BSD 3-Clause License
#
# Copyright (c) 2022, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import functools
import urllib.parse
from typing import TYPE_CHECKING, Optional, Tuple, Union
if TYPE_CHECKING:
from asgiref.typing import ASGIApplication, ASGIReceiveCallable, ASGISendCallable, Scope, ASGISendEvent
import elasticapm
from elasticapm import Client, get_client, instrument
from elasticapm.conf import constants
from elasticapm.contrib.asyncio.traces import set_context
from elasticapm.utils import default_ports, encoding
from elasticapm.utils.disttracing import TraceParent
def wrap_send(send, middleware):
@functools.wraps(send)
async def wrapped_send(message) -> None:
if message.get("type") == "http.response.start":
await set_context(lambda: middleware.get_data_from_response(message, constants.TRANSACTION), "response")
result = "HTTP {}xx".format(message["status"] // 100)
elasticapm.set_transaction_result(result, override=False)
elasticapm.set_transaction_outcome(http_status_code=message["status"], override=False)
await send(message)
return wrapped_send
class ASGITracingMiddleware:
__slots__ = ("_app", "client")
def __init__(self, app: "ASGIApplication", **options) -> None:
self._app = app
client = get_client()
if not client:
client = Client(**options)
self.client = client
if self.client.config.instrument and self.client.config.enabled:
instrument()
async def __call__(self, scope: "Scope", receive: "ASGIReceiveCallable", send: "ASGISendCallable") -> None:
if scope["type"] != "http":
await self._app(scope, receive, send)
return
send = wrap_send(send, self)
wrapped_receive = receive
url, url_dict = self.get_url(scope)
body = None
if not self.client.should_ignore_url(url):
self.client.begin_transaction(
transaction_type="request", trace_parent=TraceParent.from_headers(scope["headers"])
)
self.set_transaction_name(scope["method"], url)
if scope["method"] in constants.HTTP_WITH_BODY and self.client.config.capture_body != "off":
messages = []
more_body = True
while more_body:
message = await receive()
messages.append(message)
more_body = message.get("more_body", False)
body_raw = b"".join([message.get("body", b"") for message in messages])
body = str(body_raw, errors="ignore")
# Dispatch to the ASGI callable
async def new_wrapped_receive():
if messages:
return messages.pop(0)
# Once that's done we can just await any other messages.
return await receive()
wrapped_receive = new_wrapped_receive
await set_context(lambda: self.get_data_from_request(scope, constants.TRANSACTION, body), "request")
try:
await self._app(scope, wrapped_receive, send)
elasticapm.set_transaction_outcome(constants.OUTCOME.SUCCESS, override=False)
return
except Exception as exc:
self.client.capture_exception()
elasticapm.set_transaction_result("HTTP 5xx", override=False)
elasticapm.set_transaction_outcome(constants.OUTCOME.FAILURE, override=True)
elasticapm.set_context({"status_code": 500}, "response")
raise exc from None
finally:
self.client.end_transaction()
def get_headers(self, scope_or_message: Union["Scope", "ASGISendEvent"]) -> dict[str, str]:
headers = {}
for k, v in scope_or_message.get("headers", {}):
key = k.decode("latin1")
val = v.decode("latin1")
if key in headers:
headers[key] = f"{headers[key]}, {val}"
else:
headers[key] = val
return headers
def get_url(self, scope: "Scope", host: Optional[str] = None) -> Tuple[str, dict[str, str]]:
url_dict = {}
scheme = scope.get("scheme", "http")
server = scope.get("server", None)
path = scope.get("root_path", "") + scope.get("path", "")
url_dict["protocol"] = scheme + ":"
if host:
url = f"{scheme}://{host}{path}"
url_dict["hostname"] = host
elif server is not None:
host, port = server
url_dict["hostname"] = host
if port:
url_dict["port"] = port
default_port = default_ports.get(scheme, None)
if port != default_port:
url = f"{scheme}://{host}:{port}{path}"
else:
url = f"{scheme}://{host}{path}"
else:
url = path
qs = scope.get("query_string")
if qs:
query = "?" + urllib.parse.unquote(qs.decode("latin-1"))
url += query
url_dict["search"] = encoding.keyword_field(query)
url_dict["full"] = encoding.keyword_field(url)
return url, url_dict
def get_ip(self, scope: "Scope", headers: dict) -> Optional[str]:
x_forwarded_for = headers.get("x-forwarded-for")
remote_addr = headers.get("remote-addr")
ip: Optional[str] = None
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
elif remote_addr:
ip = remote_addr
elif scope.get("client"):
ip = scope.get("client")[0]
return ip
async def get_data_from_request(self, scope: "Scope", event_type: str, body: Optional[str]) -> dict:
"""Loads data from incoming request for APM capturing.
Args:
request (Request)
config (Config)
event_type (str)
body (str)
Returns:
dict
"""
headers = self.get_headers(scope)
result = {
"method": scope["method"],
"socket": {"remote_address": self.get_ip(scope, headers)},
"cookies": headers.pop("cookies", {}),
}
if self.client.config.capture_headers:
result["headers"] = headers
if body and self.client.config.capture_body in ("all", event_type):
result["body"] = body
url, url_dict = self.get_url(scope)
result["url"] = url_dict
return result
async def get_data_from_response(self, message: dict, event_type: str) -> dict:
"""Loads data from response for APM capturing.
Args:
message (dict)
config (Config)
event_type (str)
Returns:
dict
"""
result = {}
if "status" in message:
result["status_code"] = message["status"]
if self.client.config.capture_headers and "headers" in message:
headers = self.get_headers(message)
if headers:
result["headers"] = headers
return result
def set_transaction_name(self, method: str, url: str) -> None:
"""
Default implementation sets transaction name to "METHOD unknown route".
Subclasses may add framework specific naming.
"""
elasticapm.set_transaction_name(f"{method.upper()} unknown route")