elasticapm/utils/wsgi.py (45 lines of code) (raw):

# BSD 3-Clause License # # Copyright (c) 2019, Elasticsearch BV # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # * Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ This module implements WSGI related helpers adapted from ``werkzeug.wsgi`` :copyright: (c) 2010 by the Werkzeug Team, see AUTHORS for more details. :license: BSD, see LICENSE for more details. """ from urllib.parse import quote # `get_headers` comes from `werkzeug.datastructures.EnvironHeaders` def get_headers(environ): """ Returns only proper HTTP headers. """ for key, value in environ.items(): key = str(key) if key.startswith("HTTP_") and key not in ("HTTP_CONTENT_TYPE", "HTTP_CONTENT_LENGTH"): yield key[5:].replace("_", "-").lower(), value elif key in ("CONTENT_TYPE", "CONTENT_LENGTH"): yield key.replace("_", "-").lower(), value def get_environ(environ): """ Returns our whitelisted environment variables. """ for key in ("REMOTE_ADDR", "SERVER_NAME", "SERVER_PORT"): if key in environ: yield key, environ[key] # `get_host` comes from `werkzeug.wsgi` def get_host(environ): """Return the real host for the given WSGI environment. This takes care of the `X-Forwarded-Host` header. :param environ: the WSGI environment to get the host of. """ scheme = environ.get("wsgi.url_scheme") if "HTTP_X_FORWARDED_HOST" in environ: result = environ["HTTP_X_FORWARDED_HOST"] elif "HTTP_HOST" in environ: result = environ["HTTP_HOST"] else: result = environ["SERVER_NAME"] if (scheme, str(environ["SERVER_PORT"])) not in (("https", "443"), ("http", "80")): result += ":" + environ["SERVER_PORT"] if result.endswith(":80") and scheme == "http": result = result[:-3] elif result.endswith(":443") and scheme == "https": result = result[:-4] return result # `get_current_url` comes from `werkzeug.wsgi` def get_current_url(environ, root_only=False, strip_querystring=False, host_only=False, path_only=False): """A handy helper function that recreates the full URL for the current request or parts of it. Here an example: >>> from werkzeug import create_environ >>> env = create_environ("/?param=foo", "http://localhost/script") >>> get_current_url(env) 'http://localhost/script/?param=foo' >>> get_current_url(env, root_only=True) 'http://localhost/script/' >>> get_current_url(env, host_only=True) 'http://localhost/' >>> get_current_url(env, strip_querystring=True) 'http://localhost/script/' :param environ: the WSGI environment to get the current URL from. :param root_only: set `True` if you only want the root URL. :param strip_querystring: set to `True` if you don't want the querystring. :param host_only: set to `True` if the host URL should be returned. :param path_only: set to `True` if only the path should be returned. """ if path_only: tmp = [] else: tmp = [environ["wsgi.url_scheme"], "://", get_host(environ)] cat = tmp.append if host_only: return "".join(tmp) + "/" cat(quote(environ.get("SCRIPT_NAME", "").rstrip("/"))) if root_only: cat("/") else: cat(quote("/" + environ.get("PATH_INFO", "").lstrip("/"))) if not strip_querystring: qs = environ.get("QUERY_STRING") if qs: cat("?" + qs) return "".join(tmp)