in azure/functions/queue.py [0:0]
def decode(cls, data: meta.Datum, *,
trigger_metadata) -> Any:
data_type = data.type
if data_type in ['string', 'bytes']:
body = data.value
else:
raise NotImplementedError(
f'unsupported queue payload type: {data_type}')
if trigger_metadata is None:
raise NotImplementedError(
'missing trigger metadata for queue input')
return QueueMessage(
id=cls._decode_trigger_metadata_field(
trigger_metadata, 'Id', python_type=str),
body=body,
dequeue_count=cls._decode_trigger_metadata_field(
trigger_metadata, 'DequeueCount', python_type=int),
expiration_time=cls._parse_datetime_metadata(
trigger_metadata, 'ExpirationTime'),
insertion_time=cls._parse_datetime_metadata(
trigger_metadata, 'InsertionTime'),
time_next_visible=cls._parse_datetime_metadata(
trigger_metadata, 'NextVisibleTime'),
pop_receipt=cls._decode_trigger_metadata_field(
trigger_metadata, 'PopReceipt', python_type=str)
)