server/app/lib/middleware.py (122 lines of code) (raw):

#!/usr/bin/env python3 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ASF Infrastructure Reporting Dashboard - Quart Middleware""" if not __debug__: raise RuntimeError("This code requires assert statements to be enabled") import sys import traceback import typing import uuid import quart from . import config import werkzeug.routing import asyncio import functools import aiohttp import time import json async def consume_body(): """Consumes the request body, punting it to dev-null. This is required for httpd to not throw 502 at error""" # See: https://bz.apache.org/bugzilla/show_bug.cgi?id=55433 async for _data in quart.request.body: pass def glued(func: typing.Callable) -> typing.Callable: """Middleware that collects all form data (except file uploads!) and joins as one dict""" async def call(**args): form_data = dict() form_data.update(quart.request.args.to_dict()) xform = await quart.request.form # Pre-parse check for form data size if quart.request.content_type and any( x in quart.request.content_type for x in ( "multipart/form-data", "application/x-www-form-urlencoded", "application/x-url-encoded", ) ): if quart.request.content_length > config.server.max_form_size: await consume_body() return quart.Response( status=413, response=f"Request content length ({quart.request.content_length} bytes) is larger than what is permitted for form data ({config.server.max_form_size} bytes)!", ) if xform: form_data.update(xform.to_dict()) if quart.request.is_json: xjson = await quart.request.json form_data.update(xjson) try: resp = await func(form_data, **args) assert resp, "No response was provided by the underlying endpoint!" except Exception: # Catch and spit out errors exc_type, exc_value, exc_traceback = sys.exc_info() err = "\n".join(traceback.format_exception(exc_type, exc_value, exc_traceback)) headers = { "Server": "ASF Infra Reporting Dashboard", "Content-Type": "text/plain", } # By default, we print the traceback to the user, for easy debugging. if config.server.error_reporting == "json": error_text = "API error occurred: \n" + err return quart.Response(headers=headers, status=500, response=error_text) # If client traceback is disabled, we print it to stderr instead, but leave an # error ID for the client to report back to the admin. Every line of the traceback # will have this error ID at the beginning of the line, for easy grepping. else: # We only need a short ID here, let's pick 18 chars. eid = str(uuid.uuid4())[:18] sys.stderr.write("API Endpoint %s got into trouble (%s): \n" % (quart.request.path, eid)) for line in err.split("\n"): sys.stderr.write("%s: %s\n" % (eid, line)) return quart.Response( headers=headers, status=500, response="API error occurred. The application journal will have information. Error ID: %s" % eid, ) # If an error is thrown before the request body has been consumed, eat it quietly. if not quart.request.body._complete.is_set(): await consume_body() return resp # Quart will, if no rule name is specified, default to calling the rule "call" here, # which leads to carps about duplicate rule definitions. So, given the fact that call() # is dynamically made from within this function, we simply adjust its internal name to # refer to the calling module and function, thus providing Quart with a much better # name for the rule, which will also aid in debugging. call.__name__ = func.__module__ + "." + func.__name__ return call def auth_failed(): """Returns the appropriate authorization failure response, depending on auth mechanism supplied.""" if "x-artifacts-webui" not in quart.request.headers: # Not done via Web UI, standard 401 response headers = {"WWW-Authenticate": 'Basic realm="infra-reports-ec2-va.apache.org"'} return quart.Response(status=401, headers=headers, response="Please authenticate yourself first!\n") else: # Web UI response, do not send Realm header (we do not want a pop-up in our browser!) return quart.Response(status=401, response="Please authenticate yourself first!\n") class FilenameConverter(werkzeug.routing.BaseConverter): """Simple converter that splits a filename into a basename and an extension""" regex = r"^[^/.]*(\.[A-Za-z0-9]+)?$" part_isolating = False def to_python(self, filename): extension = "" # If foo.bar, split into base and ext. Otherwise, keep filename as full string (even for .htaccess etc) if "." in filename[1:]: filename, extension = filename.split(".", maxsplit=1) return filename, extension async def reset_rate_limits(): """Reset daily rate limits for lookups""" while True: await asyncio.sleep(86400) config.rate_limits.clear() def rate_limited(func): """Decorator for calls that are rate-limited for anonymous users. Once the number of requests per day has been exceeded, this decorator will return a 429 HTTP response to the client instead. """ @functools.wraps(func) async def session_wrapper(*args): ip = quart.request.headers.get("X-Forwarded-For", quart.request.remote_addr).split(",")[-1].strip() usage = config.rate_limits.get(ip, 0) + 1 if config.server.rate_limit_per_ip and usage > config.server.rate_limit_per_ip: return quart.Response( status=429, response="Your request has been rate-limited. Please check back tomorrow!" ) config.rate_limits[ip] = usage print(ip, usage) return await func(*args) return session_wrapper class CachedJson: """A simple JSON URL fetcher with a built-in cache. Once a cached JSON element expires, any subsequent reads will cause the class to re-fetch the object. If the object cannot be fetched, a stale version will be returned instead. Usage example: foo = CachedJson("https://www.apache.org/foo.json") jsondata = await foo.json # do stuff here.... jsondata = await foo.json # <- will either use cache or fetch again if out of date """ def __init__(self, url: str, expiry: int = 30): self.url = url self.expiry = expiry self.last = 0 self.timeout = aiohttp.ClientTimeout(total=30) self._cache = None @property async def json(self): now = time.time() if now > (self.last + self.expiry): # Cache expired? try: async with aiohttp.ClientSession(timeout=self.timeout) as hc: async with hc.get(self.url) as req: if req.status == 200: source_json = await req.json() if source_json: self._cache = source_json self.last = now except (aiohttp.ClientError, json.JSONDecodeError) as e: print(f"Could not fetch URL {self.url} for caching, will use stale checkout: {e}.") return self._cache