in azure_functions_worker/bindings/meta.py [0:0]
def from_incoming_proto(
binding: str,
pb: protos.ParameterBinding, *,
pytype: typing.Optional[type],
trigger_metadata: typing.Optional[typing.Dict[str, protos.TypedData]],
shmem_mgr: SharedMemoryManager,
function_name: str,
is_deferred_binding: typing.Optional[bool] = False) -> typing.Any:
binding = get_binding(binding, is_deferred_binding)
if trigger_metadata:
metadata = {
k: datumdef.Datum.from_typed_data(v)
for k, v in trigger_metadata.items()
}
else:
metadata = {}
pb_type = pb.WhichOneof(PB_TYPE)
if pb_type == PB_TYPE_DATA:
val = pb.data
datum = datumdef.Datum.from_typed_data(val)
elif pb_type == PB_TYPE_RPC_SHARED_MEMORY:
# Data was sent over shared memory, attempt to read
datum = datumdef.Datum.from_rpc_shared_memory(pb.rpc_shared_memory,
shmem_mgr)
else:
raise TypeError(f'Unknown ParameterBindingType: {pb_type}')
try:
# if the binding is an sdk type binding
if is_deferred_binding:
return deferred_bindings_decode(binding=binding,
pb=pb,
pytype=pytype,
datum=datum,
metadata=metadata,
function_name=function_name)
return binding.decode(datum, trigger_metadata=metadata)
except NotImplementedError:
# Binding does not support the data.
dt = val.WhichOneof('data')
raise TypeError(
f'unable to decode incoming TypedData: '
f'unsupported combination of TypedData field {dt!r} '
f'and expected binding type {binding}')