elasticapm/contrib/sanic/utils.py (84 lines of code) (raw):
# BSD 3-Clause License
#
# Copyright (c) 2012, the Sentry Team, see AUTHORS for more details
# Copyright (c) 2019, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
from string import ascii_uppercase
from typing import Dict
from sanic import Sanic
from sanic import __version__ as version
from sanic.cookies import Cookie, CookieJar
from sanic.request import Request
from sanic.response import HTTPResponse
from elasticapm.base import Client
from elasticapm.conf import Config, constants
from elasticapm.contrib.sanic.sanic_types import EnvInfoType
from elasticapm.utils import get_url_dict
class SanicAPMConfig(dict):
def __init__(self, app: Sanic) -> None:
super(SanicAPMConfig, self).__init__()
for _key, _v in app.config.items():
if _key.startswith("ELASTIC_APM_"):
self[_key.replace("ELASTIC_APM_", "")] = _v
def get_env(request: Request) -> EnvInfoType:
"""
Extract Server Environment Information from the current Request's context
:param request: Inbound HTTP Request
:return: A tuple containing the attribute and it's corresponding value for the current Application ENV
"""
for _attr in ("server_name", "server_port", "version"):
if hasattr(request, _attr):
yield _attr, getattr(request, _attr)
# noinspection PyBroadException
async def get_request_info(config: Config, request: Request, event_type: str) -> Dict[str, str]:
"""
Generate a traceable context information from the inbound HTTP request
:param config: Application Configuration used to tune the way the data is captured
:param request: Inbound HTTP request
:param event_type: the event type (such as constants.TRANSACTION) for determing whether to capture the body
:return: A dictionary containing the context information of the ongoing transaction
"""
env = dict(get_env(request=request))
app_config = {k: v for k, v in dict(request.app.config).items() if all(letter in ascii_uppercase for letter in k)}
env.update(app_config)
result = {
"env": env,
"method": request.method,
"socket": {
"remote_address": _get_client_ip(request=request),
"encrypted": request.scheme in ["https", "wss"],
},
"cookies": request.cookies,
"http_version": request.version,
}
if config.capture_headers:
result["headers"] = dict(request.headers)
if request.method in constants.HTTP_WITH_BODY and config.capture_body in ("all", event_type):
if request.content_type.startswith("multipart") or "octet-stream" in request.content_type:
result["body"] = "[DISCARDED]"
try:
result["body"] = request.body.decode("utf-8")
except Exception:
pass
if "body" not in result:
result["body"] = "[REDACTED]"
result["url"] = get_url_dict(request.url)
return result
async def get_response_info(config: Config, response: HTTPResponse, event_type: str) -> Dict[str, str]:
"""
Generate a traceable context information from the inbound HTTP Response
:param config: Application Configuration used to tune the way the data is captured
:param response: outbound HTTP Response
:param event_type: the event type (such as constants.TRANSACTION) for determing whether to capture the body
:return: A dictionary containing the context information of the ongoing transaction
"""
result = {
"cookies": _transform_response_cookie(cookies=response.cookies),
"finished": True,
"headers_sent": True,
}
if isinstance(response.status, int):
result["status_code"] = response.status
if config.capture_headers:
def normalize(v):
# we are getting entries for Set-Cookie headers as Cookie instances
if isinstance(v, Cookie):
return str(v)
return v
result["headers"] = {k: normalize(v) for k, v in response.headers.items()}
if config.capture_body in ("all", event_type) and "octet-stream" not in response.content_type:
result["body"] = response.body.decode("utf-8")
else:
result["body"] = "[REDACTED]"
return result
def _get_client_ip(request: Request) -> str:
"""Extract Client IP Address Information"""
try:
return request.ip or request.socket[0] or request.remote_addr
except IndexError:
return request.remote_addr
def make_client(client_cls=Client, **defaults) -> Client:
if "framework_name" not in defaults:
defaults["framework_name"] = "sanic"
defaults["framework_version"] = version
return client_cls(**defaults)
def _transform_response_cookie(cookies: CookieJar) -> Dict[str, str]:
"""Transform the Sanic's CookieJar instance into a Normal dictionary to build the context"""
# old sanic versions used to have an items() method
if hasattr(cookies, "items"):
return {k: {"value": v.value, "path": v["path"]} for k, v in cookies.items()}
try:
return {cookie.key: {"value": cookie.value, "path": cookie.path} for cookie in cookies.cookies}
except KeyError:
# cookies.cookies assumes Set-Cookie header will be there
return {}