azure_functions_worker/bindings/meta.py (228 lines of code) (raw):

# Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. import os import sys import typing from .. import protos from ..constants import ( BASE_EXT_SUPPORTED_PY_MINOR_VERSION, CUSTOMER_PACKAGES_PATH, HTTP, HTTP_TRIGGER, ) from ..http_v2 import HttpV2Registry from ..logging import logger from . import datumdef, generic from .shared_memory_data_transfer import SharedMemoryManager PB_TYPE = 'rpc_data' PB_TYPE_DATA = 'data' PB_TYPE_RPC_SHARED_MEMORY = 'rpc_shared_memory' BINDING_REGISTRY = None DEFERRED_BINDING_REGISTRY = None deferred_bindings_cache = {} def _check_http_input_type_annotation(bind_name: str, pytype: type, is_deferred_binding: bool) -> bool: if HttpV2Registry.http_v2_enabled(): return HttpV2Registry.ext_base().RequestTrackerMeta \ .check_type(pytype) binding = get_binding(bind_name, is_deferred_binding) return binding.check_input_type_annotation(pytype) def _check_http_output_type_annotation(bind_name: str, pytype: type) -> bool: if HttpV2Registry.http_v2_enabled(): return HttpV2Registry.ext_base().ResponseTrackerMeta.check_type(pytype) binding = get_binding(bind_name) return binding.check_output_type_annotation(pytype) INPUT_TYPE_CHECK_OVERRIDE_MAP = { HTTP_TRIGGER: _check_http_input_type_annotation } OUTPUT_TYPE_CHECK_OVERRIDE_MAP = { HTTP: _check_http_output_type_annotation } def load_binding_registry() -> None: """ Tries to load azure-functions from the customer's BYO. If it's not found, it loads the builtin. If the BINDING_REGISTRY is None, azure-functions hasn't been loaded in properly. Tries to load the base extension only for python 3.8+. """ func = sys.modules.get('azure.functions') if func is None: import azure.functions as func global BINDING_REGISTRY BINDING_REGISTRY = func.get_binding_registry() if BINDING_REGISTRY is None: raise AttributeError('BINDING_REGISTRY is None. azure-functions ' 'library not found. Sys Path: %s. ' 'Sys Modules: %s. ' 'python-packages Path exists: %s.', sys.path, sys.modules, os.path.exists(CUSTOMER_PACKAGES_PATH)) if sys.version_info.minor >= BASE_EXT_SUPPORTED_PY_MINOR_VERSION: try: import azurefunctions.extensions.base as clients global DEFERRED_BINDING_REGISTRY DEFERRED_BINDING_REGISTRY = clients.get_binding_registry() except ImportError: logger.debug('Base extension not found. ' 'Python version: 3.%s, Sys path: %s, ' 'Sys Module: %s, python-packages Path exists: %s.', sys.version_info.minor, sys.path, sys.modules, os.path.exists(CUSTOMER_PACKAGES_PATH)) def get_binding(bind_name: str, is_deferred_binding: typing.Optional[bool] = False)\ -> object: """ First checks if the binding is a non-deferred binding. This is the most common case. Second checks if the binding is a deferred binding. If the binding is neither, it's a generic type. """ binding = None if binding is None and not is_deferred_binding: binding = BINDING_REGISTRY.get(bind_name) if binding is None and is_deferred_binding: binding = DEFERRED_BINDING_REGISTRY.get(bind_name) if binding is None: binding = generic.GenericBinding return binding def is_trigger_binding(bind_name: str) -> bool: binding = get_binding(bind_name) return binding.has_trigger_support() def check_input_type_annotation(bind_name: str, pytype: type, is_deferred_binding: bool) -> bool: global INPUT_TYPE_CHECK_OVERRIDE_MAP if bind_name in INPUT_TYPE_CHECK_OVERRIDE_MAP: return INPUT_TYPE_CHECK_OVERRIDE_MAP[bind_name](bind_name, pytype, is_deferred_binding) binding = get_binding(bind_name, is_deferred_binding) return binding.check_input_type_annotation(pytype) def check_output_type_annotation(bind_name: str, pytype: type) -> bool: global OUTPUT_TYPE_CHECK_OVERRIDE_MAP if bind_name in OUTPUT_TYPE_CHECK_OVERRIDE_MAP: return OUTPUT_TYPE_CHECK_OVERRIDE_MAP[bind_name](bind_name, pytype) binding = get_binding(bind_name) return binding.check_output_type_annotation(pytype) def has_implicit_output(bind_name: str) -> bool: binding = get_binding(bind_name) # Need to pass in bind_name to exempt Durable Functions if binding is generic.GenericBinding: return (getattr(binding, 'has_implicit_output', lambda: False) (bind_name)) else: # If the binding does not have metaclass of meta.InConverter # The implicit_output does not exist return getattr(binding, 'has_implicit_output', lambda: False)() def from_incoming_proto( binding: str, pb: protos.ParameterBinding, *, pytype: typing.Optional[type], trigger_metadata: typing.Optional[typing.Dict[str, protos.TypedData]], shmem_mgr: SharedMemoryManager, function_name: str, is_deferred_binding: typing.Optional[bool] = False) -> typing.Any: binding = get_binding(binding, is_deferred_binding) if trigger_metadata: metadata = { k: datumdef.Datum.from_typed_data(v) for k, v in trigger_metadata.items() } else: metadata = {} pb_type = pb.WhichOneof(PB_TYPE) if pb_type == PB_TYPE_DATA: val = pb.data datum = datumdef.Datum.from_typed_data(val) elif pb_type == PB_TYPE_RPC_SHARED_MEMORY: # Data was sent over shared memory, attempt to read datum = datumdef.Datum.from_rpc_shared_memory(pb.rpc_shared_memory, shmem_mgr) else: raise TypeError(f'Unknown ParameterBindingType: {pb_type}') try: # if the binding is an sdk type binding if is_deferred_binding: return deferred_bindings_decode(binding=binding, pb=pb, pytype=pytype, datum=datum, metadata=metadata, function_name=function_name) return binding.decode(datum, trigger_metadata=metadata) except NotImplementedError: # Binding does not support the data. dt = val.WhichOneof('data') raise TypeError( f'unable to decode incoming TypedData: ' f'unsupported combination of TypedData field {dt!r} ' f'and expected binding type {binding}') def get_datum(binding: str, obj: typing.Any, pytype: typing.Optional[type]) -> datumdef.Datum: """ Convert an object to a datum with the specified type. """ binding = get_binding(binding) try: datum = binding.encode(obj, expected_type=pytype) except NotImplementedError: # Binding does not support the data. raise TypeError( f'unable to encode outgoing TypedData: ' f'unsupported type "{binding}" for ' f'Python type "{type(obj).__name__}"') return datum def _does_datatype_support_caching(datum: datumdef.Datum): supported_datatypes = ('bytes', 'string') return datum.type in supported_datatypes def _can_transfer_over_shmem(shmem_mgr: SharedMemoryManager, is_function_data_cache_enabled: bool, datum: datumdef.Datum): """ If shared memory is enabled and supported for the given datum, try to transfer to host over shared memory as a default. If caching is enabled, then also check if this type is supported - if so, transfer over shared memory. In case of caching, some conditions like object size may not be applicable since even small objects are also allowed to be cached. """ if not shmem_mgr.is_enabled(): # If shared memory usage is not enabled, no further checks required return False if shmem_mgr.is_supported(datum): # If transferring this object over shared memory is supported, do so. return True if is_function_data_cache_enabled and _does_datatype_support_caching(datum): # If caching is enabled and this object can be cached, transfer over # shared memory (since the cache uses shared memory). # In this case, some requirements (like object size) for using shared # memory may be ignored since we want to support caching of small # objects (those that have sizes smaller that the minimum we transfer # over shared memory when the cache is not enabled) as well. return True return False def to_outgoing_proto(binding: str, obj: typing.Any, *, pytype: typing.Optional[type]) -> protos.TypedData: datum = get_datum(binding, obj, pytype) return datumdef.datum_as_proto(datum) def to_outgoing_param_binding(binding: str, obj: typing.Any, *, pytype: typing.Optional[type], out_name: str, shmem_mgr: SharedMemoryManager, is_function_data_cache_enabled: bool) \ -> protos.ParameterBinding: datum = get_datum(binding, obj, pytype) shared_mem_value = None if _can_transfer_over_shmem(shmem_mgr, is_function_data_cache_enabled, datum): shared_mem_value = datumdef.Datum.to_rpc_shared_memory(datum, shmem_mgr) # Check if data was written into shared memory if shared_mem_value is not None: # If it was, then use the rpc_shared_memory field in response message return protos.ParameterBinding( name=out_name, rpc_shared_memory=shared_mem_value) else: # If not, send it as part of the response message over RPC # rpc_val can be None here as we now support a None return type rpc_val = datumdef.datum_as_proto(datum) return protos.ParameterBinding( name=out_name, data=rpc_val) def deferred_bindings_decode(binding: typing.Any, pb: protos.ParameterBinding, *, pytype: typing.Optional[type], datum: typing.Any, metadata: typing.Any, function_name: str): """ This cache holds deferred binding types (ie. BlobClient, ContainerClient) That have already been created, so that the worker can reuse the Previously created type without creating a new one. For async types, the function_name is needed as a key to differentiate. This prevents a known SDK issue where reusing a client across functions can lose the session context and cause an error. The cache key is based on: param name, type, resource, function_name If cache is empty or key doesn't exist, deferred_binding_type is None """ global deferred_bindings_cache # Only applies to Event Hub and Service Bus - cannot cache # These types will always produce different content and are not clients if (datum.type == "collection_model_binding_data" or datum.value.source == "AzureEventHubsEventData" or datum.value.source == "AzureServiceBusReceivedMessage"): return binding.decode(datum, trigger_metadata=metadata, pytype=pytype) if deferred_bindings_cache.get((pb.name, pytype, datum.value.content, function_name), None) is not None: return deferred_bindings_cache.get((pb.name, pytype, datum.value.content, function_name)) else: deferred_binding_type = binding.decode(datum, trigger_metadata=metadata, pytype=pytype) deferred_bindings_cache[(pb.name, pytype, datum.value.content, function_name)] = deferred_binding_type return deferred_binding_type def check_deferred_bindings_enabled(param_anno: type, deferred_bindings_enabled: bool) -> (bool, bool): """ Checks if deferred bindings is enabled at fx and single binding level The first bool represents if deferred bindings is enabled at a fx level The second represents if the current binding is deferred binding """ if (DEFERRED_BINDING_REGISTRY is not None and DEFERRED_BINDING_REGISTRY.check_supported_type(param_anno)): return True, True else: return deferred_bindings_enabled, False def get_deferred_raw_bindings(indexed_function, input_types): """ Calls a method from the base extension that generates the raw bindings for a given function. It also returns logs for that function including the defined binding type and if deferred bindings is enabled for that binding. """ raw_bindings, bindings_logs = DEFERRED_BINDING_REGISTRY.get_raw_bindings( indexed_function, input_types) return raw_bindings, bindings_logs