def by_lines()

in storage/decorator.py [0:0]


def by_lines(func: StorageDecoratorCallable[ProtocolStorageType]) -> StorageDecoratorCallable[ProtocolStorageType]:
    """
    ProtocolStorage decorator for returning content split by lines
    """

    def wrapper(
        storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
    ) -> StorageDecoratorIterator:
        ending_offset: int = range_start
        unfinished_line: bytes = b""

        iterator = func(storage, range_start, body, is_gzipped)

        for data, _, _, _, _ in iterator:
            assert isinstance(data, bytes)

            unfinished_line += data
            lines = unfinished_line.decode("utf-8").splitlines()

            if len(lines) == 0:
                continue

            if unfinished_line.find(b"\r\n") > -1:
                newline = b"\r\n"
            elif unfinished_line.find(b"\n") > -1:
                newline = b"\n"
            else:
                newline = b""

            # replace unfinished_line with the last element removed from lines, trailing with newline
            if unfinished_line.endswith(newline):
                unfinished_line = lines.pop().encode() + newline
            else:
                unfinished_line = lines.pop().encode()

            for line in lines:
                line_encoded = line.encode("utf-8")
                starting_offset = ending_offset
                ending_offset += len(line_encoded) + len(newline)
                shared_logger.debug("by_line lines", extra={"offset": ending_offset})

                yield line_encoded, starting_offset, ending_offset, newline, None

        if len(unfinished_line) > 0:
            if unfinished_line.endswith(b"\r\n"):
                newline = b"\r\n"
            elif unfinished_line.endswith(b"\n"):
                newline = b"\n"
            else:
                newline = b""

            unfinished_line = unfinished_line.rstrip(newline)

            starting_offset = ending_offset
            ending_offset += len(unfinished_line) + len(newline)

            shared_logger.debug("by_line unfinished_line", extra={"offset": ending_offset})

            yield unfinished_line, starting_offset, ending_offset, newline, None

    return wrapper