darabonba/utils/stream.py (201 lines of code) (raw):
import json
import re
from darabonba.event import Event
from io import BytesIO, StringIO
from typing import Any, BinaryIO
# define WRITEABLE
sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')
class BaseStream:
def __init__(self, size=1024):
self.size = size
def read(self, size=1024):
raise NotImplementedError('read method must be overridden')
def __len__(self):
raise NotImplementedError('__len__ method must be overridden')
def __next__(self):
raise NotImplementedError('__next__ method must be overridden')
def __iter__(self):
return self
class _ReadableMc(type):
def __instancecheck__(self, instance):
if hasattr(instance, 'read') and hasattr(instance, '__iter__'):
return True
class READABLE(metaclass=_ReadableMc):
pass
class _WriteableMc(type):
def __instancecheck__(self, instance):
if hasattr(instance, 'write'):
return True
class WRITABLE(metaclass=_WriteableMc):
pass
STREAM_CLASS = (READABLE, WRITABLE)
class Stream:
def __init__(self, data=None):
self.data = data if data is not None else b''
self.position = 0
@staticmethod
def __read_part(f, size=1024):
while True:
part = f.read(size)
if part:
yield part
else:
return
@staticmethod
def __to_string(
val: bytes,
) -> str:
"""
Convert a bytes to string(utf8)
@return: the return string
"""
if isinstance(val, str):
return val
elif isinstance(val, bytes):
return val.decode('utf-8')
else:
return str(val)
@staticmethod
def __parse_json(
val: str,
) -> Any:
"""
Parse it by JSON format
@return: the parsed result
"""
try:
return json.loads(val)
except ValueError:
raise RuntimeError(f'Failed to parse the value as json format, Value: "{val}".')
@staticmethod
def read_as_bytes(stream) -> bytes:
"""
Read data from a readable stream, and compose it to a bytes
@param stream: the readable stream
@return: the bytes result
"""
if isinstance(stream, READABLE):
b = b''
for part in Stream.__read_part(stream, 1024):
b += part
return b
elif isinstance(stream, bytes):
return stream
else:
return bytes(stream, encoding='utf-8')
@staticmethod
async def read_as_bytes_async(stream) -> bytes:
"""
Read data from a readable stream, and compose it to a bytes
@param stream: the readable stream
@return: the bytes result
"""
if isinstance(stream, bytes):
return stream
elif isinstance(stream, str):
return bytes(stream, encoding='utf-8')
else:
return await stream.read()
@staticmethod
def read_as_json(stream) -> Any:
"""
Read data from a readable stream, and parse it by JSON format
@param stream: the readable stream
@return: the parsed result
"""
return Stream.__parse_json(Stream.read_as_string(stream))
@staticmethod
async def read_as_json_async(stream) -> Any:
"""
Read data from a readable stream, and parse it by JSON format
@param stream: the readable stream
@return: the parsed result
"""
return Stream.__parse_json(
await Stream.read_as_string_async(stream)
)
@staticmethod
def read_as_string(stream) -> str:
"""
Read data from a readable stream, and compose it to a string
@param stream: the readable stream
@return: the string result
"""
buff = Stream.read_as_bytes(stream)
return Stream.__to_string(buff)
@staticmethod
async def read_as_string_async(stream) -> str:
"""
Read data from a readable stream, and compose it to a string
@param stream: the readable stream
@return: the string result
"""
buff = await Stream.read_as_bytes_async(stream)
return Stream.__to_string(buff)
@staticmethod
def read_as_sse(stream):
bytes_content = Stream.read_as_bytes(stream)
lines = bytes_content.splitlines()
sse_line_pattern = re.compile(r'^(?P<name>[^:]+): (?P<value>.+)$')
current_event = Event() # Initialize current event
for line_item in lines:
line = line_item.decode('utf-8')
if not line.strip() or line.startswith(':'):
continue
match = sse_line_pattern.match(line)
if match:
name = match.group('name')
value = match.group('value')
if name == 'event':
current_event.event = value
elif name == 'id':
current_event.id = value
elif name == 'data':
current_event.data = value
elif name == 'retry':
try:
current_event.retry = int(value)
except ValueError:
pass
# If data is present, yield the event since data line indicates completion of an event typically
if current_event.data is not None:
yield {
'id': current_event.id,
'event': current_event.event,
'data': current_event.data
}
current_event = Event()
@staticmethod
async def read_as_sse_async(stream):
bytes_content = await Stream.read_as_bytes_async(stream)
lines = bytes_content.splitlines()
sse_line_pattern = re.compile(r'^(?P<name>[^:]+): (?P<value>.+)$')
event = Event()
async for line_item in lines:
line = line_item.decode('utf-8')
if not line.strip() or line.startswith(':'):
continue
match = sse_line_pattern.match(line)
if match:
name = match.group('name')
value = match.group('value')
if name == 'event':
current_event.event = value
elif name == 'id':
current_event.id = value
elif name == 'data':
current_event.data = value
elif name == 'retry':
try:
current_event.retry = int(value)
except ValueError:
pass
# If data is present, yield the event since data line indicates completion of an event typically
if current_event.data is not None:
yield {
'id': current_event.id,
'event': current_event.event,
'data': current_event.data
}
current_event = Event()
def read(self, size=None):
if size is None:
return self.data[self.position:]
start = self.position
end = min(start + size, len(self.data))
self.position = end
return self.data[start:end]
def write(self, data):
if isinstance(data, (bytes, str)):
self.data = data
else:
raise TypeError("Data should be bytes or string.")
def pipe(self, output_stream, buffer_size=1024):
if not isinstance(output_stream, Stream):
raise TypeError("Output stream should be an instance of Stream.")
while True:
chunk = self.read(buffer_size)
if not chunk:
break
output_stream.write(chunk)
@staticmethod
def to_readable(
value: Any,
) -> BinaryIO:
"""
Assert a value, if it is a readable, return it, otherwise throws
@return: the readable value
"""
if isinstance(value, str):
value = value.encode('utf-8')
if isinstance(value, bytes):
value = BytesIO(value)
elif not isinstance(value, READABLE):
raise ValueError(f'The value is not a readable')
return value
@staticmethod
def to_writeable(
value: Any,
) -> WRITABLE:
"""
Assert a value, if it is a writeable, return it, otherwise throws
@return: the writeable value
"""
if isinstance(value, str):
value = StringIO(value)
elif isinstance(value, bytes):
value = BytesIO(value)
elif not isinstance(value, WRITABLE):
raise ValueError(f'The value is not a writeable')
return value