in azure/functions/_http_wsgi.py [0:0]
def __init__(self,
func_req: HttpRequest,
func_ctx: Optional[Context] = None):
url: ParseResult = urlparse(func_req.url)
func_req_body = func_req.get_body() or b''
# Convert function request headers to lowercase header
self._lowercased_headers = {
k.lower(): v for k, v in func_req.headers.items()
}
# Implement interfaces for PEP 3333 environ
self.request_method = getattr(func_req, 'method', None)
self.script_name = ''
self.path_info = (
unquote_to_bytes(getattr(url, 'path', None)) # type: ignore
.decode('latin-1' if type(self) is WsgiRequest else 'utf-8')
)
self.query_string = getattr(url, 'query', None)
self.content_type = self._lowercased_headers.get('content-type')
self.content_length = str(len(func_req_body))
self.server_name = getattr(url, 'hostname', None)
self.server_port = str(self._get_port(url, self._lowercased_headers))
self.server_protocol = 'HTTP/1.1'
# Propagate http request headers into HTTP_ environ
self._http_environ: Dict[str, str] = self._get_http_headers(
func_req.headers
)
# Wsgi environ
self.wsgi_version = (1, 0)
self.wsgi_url_scheme = url.scheme
self.wsgi_input = BytesIO(func_req_body)
self.wsgi_multithread = False
self.wsgi_multiprocess = False
self.wsgi_run_once = False
# Azure Functions context
self.af_function_directory = getattr(func_ctx,
'function_directory', None)
self.af_function_name = getattr(func_ctx, 'function_name', None)
self.af_invocation_id = getattr(func_ctx, 'invocation_id', None)
self.af_thread_local_storage = getattr(func_ctx,
'thread_local_storage',
None)
self.af_trace_context = getattr(func_ctx, 'trace_context', None)
self.af_retry_context = getattr(func_ctx, 'retry_context', None)