in azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusConverter.py [0:0]
def decode(cls, data: Datum, *, trigger_metadata, pytype) -> Any:
"""
ServiceBus allows for batches to be sent. The cardinality can be one or many.
When the cardinality is one:
- The data is of type "model_binding_data" - each event is an independent
function invocation
When the cardinality is many:
- The data is of type "collection_model_binding_data" - all events are sent
in a single function invocation
- collection_model_binding_data has 1 or more model_binding_data objects
"""
if data is None or data.type is None:
return None
data_type = data.type
if data_type == "model_binding_data":
return ServiceBusReceivedMessage(data=data.value).get_sdk_type()
elif data_type == "collection_model_binding_data":
try:
return [ServiceBusReceivedMessage(data=mbd).get_sdk_type()
for mbd in data.value.model_binding_data]
except Exception as e:
raise ValueError("Failed to decode incoming ServiceBus batch: "
+ repr(e)) from e
else:
raise ValueError(
"Unexpected type of data received for the 'servicebus' binding: "
+ repr(data.type))