elasticapm/utils/__init__.py (142 lines of code) (raw):

# BSD 3-Clause License # # Copyright (c) 2012, the Sentry Team, see AUTHORS for more details # Copyright (c) 2019, Elasticsearch BV # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # * Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE import base64 import os import re import socket import urllib.parse from functools import partial, partialmethod from types import FunctionType from typing import Pattern from elasticapm.conf import constants from elasticapm.utils import encoding partial_types = (partial, partialmethod) default_ports = {"https": 443, "http": 80, "postgresql": 5432, "mysql": 3306, "mssql": 1433} fqdn = None def varmap(func, var, context=None, name=None, **kwargs): """ Executes ``func(key_name, value)`` on all values, recursively discovering dict and list scoped values. """ if context is None: context = set() objid = id(var) if objid in context: return func(name, "<...>", **kwargs) context.add(objid) # Apply func() before recursion, so that `shorten()` doesn't have to iterate over all the trimmed values ret = func(name, var, **kwargs) if isinstance(ret, dict): # iterate over a copy of the dictionary to avoid "dictionary changed size during iteration" issues ret = dict((k, varmap(func, v, context, k, **kwargs)) for k, v in ret.copy().items()) elif isinstance(ret, (list, tuple)): # Apply func() before recursion, so that `shorten()` doesn't have to iterate over all the trimmed values ret = [varmap(func, f, context, name, **kwargs) for f in ret] context.remove(objid) return ret def get_name_from_func(func: FunctionType) -> str: # partials don't have `__module__` or `__name__`, so we use the values from the "inner" function if isinstance(func, partial_types): return "partial({})".format(get_name_from_func(func.func)) elif hasattr(func, "_partialmethod") and hasattr(func._partialmethod, "func"): return "partial({})".format(get_name_from_func(func._partialmethod.func)) elif hasattr(func, "__partialmethod__") and hasattr(func.__partialmethod__, "func"): return "partial({})".format(get_name_from_func(func.__partialmethod__.func)) module = func.__module__ if hasattr(func, "view_class"): view_name = func.view_class.__name__ elif hasattr(func, "__name__"): view_name = func.__name__ else: # Fall back if there's no __name__ view_name = func.__class__.__name__ return "{0}.{1}".format(module, view_name) def build_name_with_http_method_prefix(name, request): return " ".join((request.method, name)) if name else name def is_master_process() -> bool: # currently only recognizes uwsgi master process try: import uwsgi return os.getpid() == uwsgi.masterpid() except ImportError: return False def get_url_dict(url: str) -> dict: parse_result = urllib.parse.urlparse(url) url_dict = { "full": encoding.keyword_field(url), "protocol": parse_result.scheme + ":", "hostname": encoding.keyword_field(parse_result.hostname), "pathname": encoding.keyword_field(parse_result.path), } port = None if parse_result.port is None else str(parse_result.port) if port: url_dict["port"] = port if parse_result.query: url_dict["search"] = encoding.keyword_field("?" + parse_result.query) return url_dict def sanitize_url(url: str) -> str: if "@" not in url: return url parts = urllib.parse.urlparse(url) return url.replace("%s:%s" % (parts.username, parts.password), "%s:%s" % (parts.username, constants.MASK_URL)) def get_host_from_url(url: str) -> str: parsed_url = urllib.parse.urlparse(url) host = parsed_url.hostname or " " if parsed_url.port and default_ports.get(parsed_url.scheme) != parsed_url.port: host += ":" + str(parsed_url.port) return host def url_to_destination_resource(url: str) -> str: parts = urllib.parse.urlsplit(url) hostname = parts.hostname if parts.hostname else "" # preserve brackets for IPv6 URLs if "://[" in url: hostname = "[%s]" % hostname try: port = parts.port except ValueError: # Malformed port, just use None rather than raising an exception port = None default_port = default_ports.get(parts.scheme, None) name = "%s://%s" % (parts.scheme, hostname) resource = hostname if not port and parts.scheme in default_ports: port = default_ports[parts.scheme] if port: if port != default_port: name += ":%d" % port resource += ":%d" % port return resource def read_pem_file(file_obj) -> bytes: cert = b"" for line in file_obj: if line.startswith(b"-----BEGIN CERTIFICATE-----"): break # scan until we find the first END CERTIFICATE marker for line in file_obj: if line.startswith(b"-----END CERTIFICATE-----"): break cert += line.strip() return base64.b64decode(cert) def starmatch_to_regex(pattern: str) -> Pattern: options = re.DOTALL # check if we are case-sensitive if pattern.startswith("(?-i)"): pattern = pattern[5:] else: options |= re.IGNORECASE i, n = 0, len(pattern) res = [] while i < n: c = pattern[i] i = i + 1 if c == "*": res.append(".*") else: res.append(re.escape(c)) return re.compile(r"(?:%s)\Z" % "".join(res), options) def nested_key(d: dict, *args): """ Traverses a dictionary for nested keys. Returns `None` if the at any point in the traversal a key cannot be found. Example: >>> from elasticapm.utils import nested_key >>> d = {"a": {"b": {"c": 0}}} >>> nested_key(d, "a", "b", "c") 0 >>> nested_key(d, "a", "b", "d") None """ for arg in args: try: d = d[arg] except (TypeError, KeyError): d = None break return d def getfqdn() -> str: """ socket.getfqdn() has some issues. For one, it's slow (may do a DNS lookup). For another, it can return `localhost.localdomain`[1], which is less useful than socket.gethostname(). This function handles the fallbacks and also ensures we don't try to lookup the fqdn more than once. [1]: https://stackoverflow.com/a/43330159 """ global fqdn if not fqdn: fqdn = socket.getfqdn() if fqdn == "localhost.localdomain": fqdn = socket.gethostname() if not fqdn: fqdn = os.environ.get("HOSTNAME") if not fqdn: fqdn = os.environ.get("HOST") if fqdn is None: fqdn = "" fqdn = fqdn.lower().strip() return fqdn