in azure/functions/queue.py [0:0]
def encode(cls, obj: Any, *,
expected_type: Optional[type]) -> meta.Datum:
if isinstance(obj, str):
return meta.Datum(type='string', value=obj)
elif isinstance(obj, bytes):
return meta.Datum(type='bytes', value=obj)
elif isinstance(obj, azf_queue.QueueMessage):
return meta.Datum(
type='json',
value=json.dumps({
'id': obj.id,
'body': obj.get_body().decode('utf-8'),
})
)
elif isinstance(obj, collections.abc.Iterable):
msgs: List[Union[str, Dict]] = []
for item in obj:
if isinstance(item, str):
msgs.append(item)
elif isinstance(item, azf_queue.QueueMessage):
msgs.append({
'id': item.id,
'body': item.get_body().decode('utf-8')
})
else:
raise NotImplementedError(
'invalid data type in output '
'queue message list: {}'.format(type(item)))
return meta.Datum(
type='json',
value=json.dumps(msgs)
)
raise NotImplementedError