in storage/decorator.py [0:0]
def by_lines(func: StorageDecoratorCallable[ProtocolStorageType]) -> StorageDecoratorCallable[ProtocolStorageType]:
"""
ProtocolStorage decorator for returning content split by lines
"""
def wrapper(
storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
) -> StorageDecoratorIterator:
ending_offset: int = range_start
unfinished_line: bytes = b""
iterator = func(storage, range_start, body, is_gzipped)
for data, _, _, _, _ in iterator:
assert isinstance(data, bytes)
unfinished_line += data
lines = unfinished_line.decode("utf-8").splitlines()
if len(lines) == 0:
continue
if unfinished_line.find(b"\r\n") > -1:
newline = b"\r\n"
elif unfinished_line.find(b"\n") > -1:
newline = b"\n"
else:
newline = b""
# replace unfinished_line with the last element removed from lines, trailing with newline
if unfinished_line.endswith(newline):
unfinished_line = lines.pop().encode() + newline
else:
unfinished_line = lines.pop().encode()
for line in lines:
line_encoded = line.encode("utf-8")
starting_offset = ending_offset
ending_offset += len(line_encoded) + len(newline)
shared_logger.debug("by_line lines", extra={"offset": ending_offset})
yield line_encoded, starting_offset, ending_offset, newline, None
if len(unfinished_line) > 0:
if unfinished_line.endswith(b"\r\n"):
newline = b"\r\n"
elif unfinished_line.endswith(b"\n"):
newline = b"\n"
else:
newline = b""
unfinished_line = unfinished_line.rstrip(newline)
starting_offset = ending_offset
ending_offset += len(unfinished_line) + len(newline)
shared_logger.debug("by_line unfinished_line", extra={"offset": ending_offset})
yield unfinished_line, starting_offset, ending_offset, newline, None
return wrapper