in azure/functions/cosmosdb.py [0:0]
def decode(cls,
data: meta.Datum,
*,
trigger_metadata) -> typing.Optional[cdb.DocumentList]:
if data is None or data.type is None:
return None
data_type = data.type
if data_type in ['string', 'json']:
body = data.value
elif data_type == 'bytes':
body = data.value.decode('utf-8')
else:
raise NotImplementedError(
f'unsupported queue payload type: {data_type}')
documents = json.loads(body)
if not isinstance(documents, list):
documents = [documents]
return cdb.DocumentList(
(None if doc is None else cdb.Document.from_dict(doc))
for doc in documents)