azure/functions/_http_asgi.py (213 lines of code) (raw):
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import Dict, List, Tuple, Optional, Any, Union
import logging
import asyncio
from asyncio import Event, Queue
from urllib.parse import ParseResult, urlparse
from warnings import warn
from wsgiref.headers import Headers
from ._abc import Context
from ._http import HttpRequest, HttpResponse
from ._http_wsgi import WsgiRequest
ASGI_VERSION = "2.1"
ASGI_SPEC_VERSION = "2.1"
class AsgiRequest(WsgiRequest):
def __init__(self, func_req: HttpRequest,
func_ctx: Optional[Context] = None):
self.asgi_version = ASGI_VERSION
self.asgi_spec_version = ASGI_SPEC_VERSION
self._headers = func_req.headers
url: ParseResult = urlparse(func_req.url)
self.asgi_url_scheme = url.scheme
super().__init__(func_req, func_ctx)
def _get_encoded_http_headers(self) -> List[Tuple[bytes, bytes]]:
return [(k.encode("utf8"), v.encode("utf8"))
for k, v in self._headers.items()]
def _get_server_address(self):
if self.server_name is not None:
return (self.server_name, int(self.server_port))
return None
def to_asgi_http_scope(self):
if self.path_info is not None:
_raw_path = self.path_info.encode("utf-8")
else:
_raw_path = b''
if self.query_string is not None:
_query_string = self.query_string.encode("utf-8")
else:
_query_string = b''
return {
"type": "http",
"asgi.version": self.asgi_version,
"asgi.spec_version": self.asgi_spec_version,
"http_version": "1.1",
"method": self.request_method,
"scheme": self.asgi_url_scheme,
"path": self.path_info,
"raw_path": _raw_path,
"query_string": _query_string,
"root_path": self.script_name,
"headers": self._get_encoded_http_headers(),
"server": self._get_server_address(),
"client": None,
"azure_functions.function_directory": self.af_function_directory,
"azure_functions.function_name": self.af_function_name,
"azure_functions.invocation_id": self.af_invocation_id,
"azure_functions.thread_local_storage":
self.af_thread_local_storage,
"azure_functions.trace_context": self.af_trace_context,
"azure_functions.retry_context": self.af_retry_context
}
# Notes, missing client name, port
class AsgiResponse:
def __init__(self):
self._status_code = 0
self._headers: Union[Headers, Dict] = {}
self._buffer: List[bytes] = []
self._request_body: Optional[bytes] = b""
self._has_received_response: bool = False
@classmethod
async def from_app(cls, app, scope: Dict[str, Any],
body: bytes) -> "AsgiResponse":
res = cls()
res._request_body = body
await app(scope, res._receive, res._send)
return res
def to_func_response(self) -> HttpResponse:
lowercased_headers = {k.lower(): v for k, v in self._headers.items()}
return HttpResponse(
body=b"".join(self._buffer),
status_code=self._status_code,
headers=self._headers, # type: ignore
mimetype=lowercased_headers.get("content-type"),
charset=lowercased_headers.get("content-encoding"),
)
def _handle_http_response_start(self, message: Dict[str, Any]):
self._headers = Headers(
[(k.decode(), v.decode())
for k, v in message["headers"]])
self._status_code = message["status"]
def _handle_http_response_body(self, message: Dict[str, Any]):
self._buffer.append(message["body"])
self._has_received_response = not message.get("more_body", False)
# XXX : Chunked bodies not supported, see
# https://github.com/Azure/azure-functions-host/issues/4926
async def _receive(self):
if self._request_body is not None:
reply = {
"type": "http.request",
"body": self._request_body,
"more_body": False,
}
self._request_body = None
return reply
else:
while not self._has_received_response:
await asyncio.sleep(0.1)
return {
"type": "http.disconnect",
}
async def _send(self, message):
logging.debug("Received %s from ASGI worker.", message)
if message["type"] == "http.response.start":
self._handle_http_response_start(message)
elif message["type"] == "http.response.body":
self._handle_http_response_body(message)
elif message["type"] == "http.disconnect":
pass # Nothing todo here
class AsgiMiddleware:
"""This middleware is to adapt an ASGI supported Python server
framework into Azure Functions. It can be used by either calling the
.handle() function or exposing the .main property in a HttpTrigger.
"""
_logger = logging.getLogger('azure.functions.AsgiMiddleware')
_usage_reported = False
def __init__(self, app):
"""Instantiate an ASGI middleware to convert Azure Functions HTTP
request into ASGI Python object. Example on handling ASGI app in a HTTP
trigger by overwriting the .main() method:
import azure.functions as func
from FastapiApp import app
main = func.AsgiMiddleware(app).main
"""
if not self._usage_reported:
self._logger.debug("Starting Azure Functions ASGI middleware.")
self._usage_reported = True
self._app = app
self.main = self._handle
self.state = {}
self.lifespan_receive_queue: Optional[Queue] = None
self.lifespan_startup_event: Optional[Event] = None
self.lifespan_shutdown_event: Optional[Event] = None
self._startup_succeeded = False
def handle(self, req: HttpRequest, context: Optional[Context] = None):
"""Deprecated. Please use handle_async instead:
import azure.functions as func
from FastapiApp import app
async def main(req, context):
return await func.AsgiMiddleware(app).handle_async(req, context)
"""
warn("handle() is deprecated. Please await .handle_async() instead.",
DeprecationWarning, stacklevel=2)
self._logger.warning(
"handle() is deprecated. Please `await .handle_async()` instead.")
return self._handle(req, context)
def _handle(self, req, context):
asgi_request = AsgiRequest(req, context)
scope = asgi_request.to_asgi_http_scope()
asgi_response = asyncio.run(
AsgiResponse.from_app(self._app, scope, req.get_body())
)
return asgi_response.to_func_response()
async def handle_async(self,
req: HttpRequest,
context: Optional[Context] = None):
"""Method to convert an Azure Functions HTTP request into a ASGI
Python object. Example on handling ASGI app in a HTTP trigger by
calling .handle_async() in .main() method:
import azure.functions as func
from FastapiApp import app
async def main(req, context):
return await func.AsgiMiddleware(app).handle_async(req,
context)
"""
return await self._handle_async(req, context)
async def _handle_async(self, req, context):
asgi_request = AsgiRequest(req, context)
scope = asgi_request.to_asgi_http_scope()
asgi_response = await AsgiResponse.from_app(self._app,
scope,
req.get_body())
return asgi_response.to_func_response()
async def _lifespan_receive(self):
if not self.lifespan_receive_queue:
raise RuntimeError("notify_startup() must be called first.")
return await self.lifespan_receive_queue.get()
async def _lifespan_send(self, message):
logging.debug("Received lifespan message %s.", message)
if not self.lifespan_startup_event or not self.lifespan_shutdown_event:
raise RuntimeError("notify_startup() must be called first.")
if message["type"] == "lifespan.startup.complete":
self.lifespan_startup_event.set()
self._startup_succeeded = True
elif message["type"] == "lifespan.shutdown.complete":
self.lifespan_shutdown_event.set()
elif message["type"] == "lifespan.startup.failed":
self.lifespan_startup_event.set()
self._startup_succeeded = False
if message.get("message"):
self._logger.error("Failed ASGI startup with message '%s'.",
message["message"])
else:
self._logger.error("Failed ASGI startup event.")
elif message["type"] == "lifespan.shutdown.failed":
self.lifespan_shutdown_event.set()
if message.get("message"):
self._logger.error("Failed ASGI shutdown with message '%s'.",
message["message"])
else:
self._logger.error("Failed ASGI shutdown event.")
async def _lifespan_main(self):
scope = {
"type": "lifespan",
"asgi.version": ASGI_VERSION,
"asgi.spec_version": ASGI_SPEC_VERSION,
"state": self.state,
}
if not self.lifespan_startup_event or not self.lifespan_shutdown_event:
raise RuntimeError("notify_startup() must be called first.")
try:
await self._app(scope, self._lifespan_receive, self._lifespan_send)
finally:
self.lifespan_startup_event.set()
self.lifespan_shutdown_event.set()
async def notify_startup(self):
"""Notify the ASGI app that the server has started."""
self._logger.debug("Notifying ASGI app of startup.")
# Initialize signals and queues
if not self.lifespan_receive_queue:
self.lifespan_receive_queue = Queue()
if not self.lifespan_startup_event:
self.lifespan_startup_event = Event()
if not self.lifespan_shutdown_event:
self.lifespan_shutdown_event = Event()
startup_event = {"type": "lifespan.startup"}
await self.lifespan_receive_queue.put(startup_event)
task = asyncio.create_task(self._lifespan_main()) # NOQA
await self.lifespan_startup_event.wait()
return self._startup_succeeded
async def notify_shutdown(self):
"""Notify the ASGI app that the server is shutting down."""
if not self.lifespan_receive_queue or not self.lifespan_shutdown_event:
raise RuntimeError("notify_startup() must be called first.")
self._logger.debug("Notifying ASGI app of shutdown.")
shutdown_event = {"type": "lifespan.shutdown"}
await self.lifespan_receive_queue.put(shutdown_event)
await self.lifespan_shutdown_event.wait()