in azure/functions/kafka.py [0:0]
def decode_single_event(cls, data: meta.Datum,
trigger_metadata) -> KafkaEvent:
data_type = data.type
if data_type in ['string', 'json']:
body = data.value.encode('utf-8')
elif data_type == 'bytes':
body = data.value
else:
raise NotImplementedError(
f'unsupported event data payload type: {data_type}')
return KafkaEvent(
body=body,
timestamp=cls._decode_trigger_metadata_field(
trigger_metadata, 'Timestamp', python_type=str),
key=cls._decode_trigger_metadata_field(
trigger_metadata, 'Key', python_type=str),
partition=cls._decode_trigger_metadata_field(
trigger_metadata, 'Partition', python_type=int),
offset=cls._decode_trigger_metadata_field(
trigger_metadata, 'Offset', python_type=int),
topic=cls._decode_trigger_metadata_field(
trigger_metadata, 'Topic', python_type=str),
headers=cls._decode_trigger_metadata_field(
trigger_metadata, 'Headers', python_type=list),
trigger_metadata=trigger_metadata
)