azure_functions_worker/http_v2.py (191 lines of code) (raw):

# Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. import abc import asyncio import importlib import socket import sys from typing import Dict from azure_functions_worker.constants import ( BASE_EXT_SUPPORTED_PY_MINOR_VERSION, PYTHON_ENABLE_INIT_INDEXING, X_MS_INVOCATION_ID, ) from azure_functions_worker.logging import logger from azure_functions_worker.utils.common import is_envvar_false # Http V2 Exceptions class HttpServerInitError(Exception): """Exception raised when there is an error during HTTP server initialization.""" class MissingHeaderError(ValueError): """Exception raised when a required header is missing in the HTTP request.""" class BaseContextReference(abc.ABC): """ Base class for context references. """ def __init__(self, event_class, http_request=None, http_response=None, function=None, fi_context=None, args=None, http_trigger_param_name=None): self._http_request = http_request self._http_response = http_response self._function = function self._fi_context = fi_context self._args = args self._http_trigger_param_name = http_trigger_param_name self._http_request_available_event = event_class() self._http_response_available_event = event_class() @property def http_request(self): return self._http_request @http_request.setter def http_request(self, value): self._http_request = value self._http_request_available_event.set() @property def http_response(self): return self._http_response @http_response.setter def http_response(self, value): self._http_response = value self._http_response_available_event.set() @property def function(self): return self._function @function.setter def function(self, value): self._function = value @property def fi_context(self): return self._fi_context @fi_context.setter def fi_context(self, value): self._fi_context = value @property def http_trigger_param_name(self): return self._http_trigger_param_name @http_trigger_param_name.setter def http_trigger_param_name(self, value): self._http_trigger_param_name = value @property def args(self): return self._args @args.setter def args(self, value): self._args = value @property def http_request_available_event(self): return self._http_request_available_event @property def http_response_available_event(self): return self._http_response_available_event class AsyncContextReference(BaseContextReference): """ Asynchronous context reference class. """ def __init__(self, http_request=None, http_response=None, function=None, fi_context=None, args=None): super().__init__(event_class=asyncio.Event, http_request=http_request, http_response=http_response, function=function, fi_context=fi_context, args=args) self.is_async = True class SingletonMeta(type): """ Metaclass for implementing the singleton pattern. """ _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super().__call__(*args, **kwargs) return cls._instances[cls] class HttpCoordinator(metaclass=SingletonMeta): """ HTTP coordinator class for managing HTTP v2 requests and responses. """ def __init__(self): self._context_references: Dict[str, BaseContextReference] = {} def set_http_request(self, invoc_id, http_request): if invoc_id not in self._context_references: self._context_references[invoc_id] = AsyncContextReference() context_ref = self._context_references.get(invoc_id) context_ref.http_request = http_request def set_http_response(self, invoc_id, http_response): if invoc_id not in self._context_references: raise KeyError("No context reference found for invocation %s" % invoc_id) context_ref = self._context_references.get(invoc_id) context_ref.http_response = http_response async def get_http_request_async(self, invoc_id): if invoc_id not in self._context_references: self._context_references[invoc_id] = AsyncContextReference() await self._context_references.get( invoc_id).http_request_available_event.wait() return self._pop_http_request(invoc_id) async def await_http_response_async(self, invoc_id): if invoc_id not in self._context_references: raise KeyError("No context reference found for invocation %s" % invoc_id) await self._context_references.get( invoc_id).http_response_available_event.wait() return self._pop_http_response(invoc_id) def _pop_http_request(self, invoc_id): context_ref = self._context_references.get(invoc_id) request = context_ref.http_request if request is not None: context_ref.http_request = None return request raise ValueError("No http request found for invocation %s" % invoc_id) def _pop_http_response(self, invoc_id): context_ref = self._context_references.get(invoc_id) response = context_ref.http_response if response is not None: context_ref.http_response = None return response raise ValueError("No http response found for invocation %s" % invoc_id) def get_unused_tcp_port(): # Create a TCP socket tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Bind it to a free port provided by the OS tcp_socket.bind(("", 0)) # Get the port number port = tcp_socket.getsockname()[1] # Close the socket tcp_socket.close() # Return the port number return port def initialize_http_server(host_addr, **kwargs): """ Initialize HTTP v2 server for handling HTTP requests. """ try: ext_base = HttpV2Registry.ext_base() web_extension_mod_name = ext_base.ModuleTrackerMeta.get_module() extension_module = importlib.import_module(web_extension_mod_name) web_app_class = extension_module.WebApp web_server_class = extension_module.WebServer unused_port = get_unused_tcp_port() app = web_app_class() request_type = ext_base.RequestTrackerMeta.get_request_type() @app.route async def catch_all(request: request_type): # type: ignore invoc_id = request.headers.get(X_MS_INVOCATION_ID) if invoc_id is None: raise MissingHeaderError("Header %s not found" % X_MS_INVOCATION_ID) logger.info('Received HTTP request for invocation %s', invoc_id) http_coordinator.set_http_request(invoc_id, request) http_resp = \ await http_coordinator.await_http_response_async(invoc_id) logger.info('Sending HTTP response for invocation %s', invoc_id) # if http_resp is an python exception, raise it if isinstance(http_resp, Exception): raise http_resp return http_resp web_server = web_server_class(host_addr, unused_port, app) web_server_run_task = web_server.serve() loop = asyncio.get_event_loop() loop.create_task(web_server_run_task) web_server_address = f"http://{host_addr}:{unused_port}" logger.info('HTTP server starting on %s', web_server_address) return web_server_address except Exception as e: raise HttpServerInitError("Error initializing HTTP server: %s" % e) \ from e async def sync_http_request(http_request, func_http_request): # Sync http request route params from invoc_request to http_request (HttpV2Registry.ext_base().RequestTrackerMeta .get_synchronizer() .sync_route_params(http_request, func_http_request.route_params)) class HttpV2Registry: """ HTTP v2 registry class for managing HTTP v2 states. """ _http_v2_enabled = False _ext_base = None _http_v2_enabled_checked = False @classmethod def http_v2_enabled(cls, **kwargs): # Check if HTTP/2 enablement has already been checked if not cls._http_v2_enabled_checked: # If not checked yet, mark as checked cls._http_v2_enabled_checked = True cls._http_v2_enabled = cls._check_http_v2_enabled() # Return the result of HTTP/2 enablement return cls._http_v2_enabled @classmethod def ext_base(cls): return cls._ext_base @classmethod def _check_http_v2_enabled(cls): if sys.version_info.minor < BASE_EXT_SUPPORTED_PY_MINOR_VERSION or \ is_envvar_false(PYTHON_ENABLE_INIT_INDEXING): return False import azurefunctions.extensions.base as ext_base cls._ext_base = ext_base return cls._ext_base.HttpV2FeatureChecker.http_v2_enabled() http_coordinator = HttpCoordinator()