in azure/functions/eventgrid.py [0:0]
def encode(cls, obj: Any, *, expected_type:
Optional[type]) -> Optional[Datum]:
if isinstance(obj, str):
return meta.Datum(type='string', value=obj)
elif isinstance(obj, bytes):
return meta.Datum(type='bytes', value=obj)
elif isinstance(obj, azf_eventgrid.EventGridOutputEvent):
return meta.Datum(
type='json',
value=json.dumps({
'id': obj.id,
'subject': obj.subject,
'dataVersion': obj.data_version,
'eventType': obj.event_type,
'data': obj.get_json(),
'eventTime': cls._format_datetime(obj.event_time)
})
)
elif isinstance(obj, collections.abc.Iterable):
msgs: List[Union[str, Dict[str, Any]]] = []
for item in obj:
if isinstance(item, str):
msgs.append(item)
elif isinstance(item, azf_eventgrid.EventGridOutputEvent):
msgs.append({'id': item.id,
'subject': item.subject,
'dataVersion': item.data_version,
'eventType': item.event_type,
'data': item.get_json(),
'eventTime': cls._format_datetime(
item.event_time)
})
else:
raise NotImplementedError(
'invalid data type in output '
'queue message list: {}'.format(type(item)))
return meta.Datum(
type='json',
value=json.dumps(msgs)
)
raise NotImplementedError