in darabonba/utils/stream.py [0:0]
def read_as_sse(stream):
bytes_content = Stream.read_as_bytes(stream)
lines = bytes_content.splitlines()
sse_line_pattern = re.compile(r'^(?P<name>[^:]+): (?P<value>.+)$')
current_event = Event() # Initialize current event
for line_item in lines:
line = line_item.decode('utf-8')
if not line.strip() or line.startswith(':'):
continue
match = sse_line_pattern.match(line)
if match:
name = match.group('name')
value = match.group('value')
if name == 'event':
current_event.event = value
elif name == 'id':
current_event.id = value
elif name == 'data':
current_event.data = value
elif name == 'retry':
try:
current_event.retry = int(value)
except ValueError:
pass
# If data is present, yield the event since data line indicates completion of an event typically
if current_event.data is not None:
yield {
'id': current_event.id,
'event': current_event.event,
'data': current_event.data
}
current_event = Event()