in azure/functions/kafka.py [0:0]
def decode_multiple_events(cls, data: meta.Datum,
trigger_metadata) -> typing.List[KafkaEvent]:
parsed_data: List[bytes] = []
if data.type == 'collection_bytes':
parsed_data = data.value.bytes
elif data.type == 'collection_string':
parsed_data = [
d.encode('utf-8') for d in data.value.string
]
timestamp_props = trigger_metadata.get('TimestampArray')
key_props = trigger_metadata.get('KeyArray')
partition_props = trigger_metadata.get('PartitionArray')
offset_props = trigger_metadata.get('OffsetArray')
topic_props = trigger_metadata.get('TopicArray')
header_props = trigger_metadata.get('HeadersArray')
parsed_timestamp_props: List[Any] = cls.get_parsed_props(
timestamp_props, parsed_data)
parsed_key_props = cls.get_parsed_props(
key_props, parsed_data)
parsed_partition_props = cls.get_parsed_props(
partition_props, parsed_data)
parsed_offset_props: List[Any] = []
if offset_props is not None:
parsed_offset_props = [v for v in offset_props.value.sint64]
if len(parsed_offset_props) != len(parsed_data):
raise AssertionError(
'Number of bodies and metadata mismatched')
parsed_topic_props: List[Any]
if topic_props is not None:
parsed_topic_props = [v for v in topic_props.value.string]
parsed_headers_props: List[Any]
if header_props is not None:
parsed_headers_list = cls.get_parsed_props(header_props,
parsed_data)
parsed_headers_props = [v for v in parsed_headers_list]
events = []
for i in range(len(parsed_data)):
event = KafkaEvent(
body=parsed_data[i],
timestamp=parsed_timestamp_props[i],
key=parsed_key_props[i],
partition=parsed_partition_props[i],
offset=parsed_offset_props[i],
topic=parsed_topic_props[i],
headers=parsed_headers_props[i],
trigger_metadata=trigger_metadata
)
events.append(event)
return events