sdk/python/generative-ai/promptflow/deploy-flow/streaming/event_stream.py (60 lines of code) (raw):
class EventStream:
"""Accepts lines of text and decodes it into a stream of SSE events.
Refer to the following page for details:
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
This class is supposed to be iterated with a for loop like:
>>> for event in EventStream(lines):
... do_something_with(event)
"""
def __init__(self, lines, encoding="utf-8"):
self._lines = lines
self._encoding = encoding
@property
def decoded_lines(self):
for line in self._lines:
yield line.decode(self._encoding)
def __iter__(self):
return self
def __next__(self):
return Event.parse_from_lines(self.decoded_lines)
class Event:
"""A single event in the event stream."""
def __init__(self):
self.id = None
self.event = None
self.data = ""
def append_line(self, line):
if not line:
raise ValueError(
"Not supposed to accept empty lines. Please handle this outside of the Event class."
)
if ":" not in line:
raise ValueError("Bad format: Each line must contain `:`.")
parts = line.split(":", maxsplit=1)
if len(parts) < 2:
raise ValueError(
"Bad format: Each line must could be splited into two parts by ':'."
)
prefix = parts[0]
data = parts[1].strip()
if prefix == "id":
if self.id is not None:
raise ValueError(
"Bad event: event id cannot be specified multiple times."
)
self.event = data
if prefix == "event":
if self.event is not None:
raise ValueError(
"Bad event: event type cannot be specified multiple times."
)
self.event = data
if prefix == "data":
if not self.data:
self.data = data
else:
self.data = "\n".join((self.data, data))
# TODO: Handle other prefixes here
@staticmethod
def parse_from_lines(lines_stream):
"""Given a lines stream, parse an event from it.
It only parse the first event. The remainder are not touched.
"""
result = Event()
for line in lines_stream:
if not line:
return result
else:
result.append_line(line)
# If we reached the end of the input lines stream,
# raise StopIteration to indicate that no more events will happen
raise StopIteration()
def __str__(self):
# Defaults to "message" when event name is not defined.
event_name = self.event or "message"
return f"Event ({event_name}): {self.data}"