in storage/decorator.py [0:0]
def multi_line(func: StorageDecoratorCallable[ProtocolStorageType]) -> StorageDecoratorCallable[ProtocolStorageType]:
"""
ProtocolStorage decorator for returning content collected by multiline
"""
def wrapper(
storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
) -> StorageDecoratorIterator:
multiline_processor: Optional[ProtocolMultiline] = storage.multiline_processor
if not multiline_processor:
iterator = func(storage, range_start, body, is_gzipped)
for data, starting_offset, ending_offset, newline, event_expanded_offset in iterator:
assert isinstance(data, bytes)
shared_logger.debug(
"no multi_line processor configured, processing as single line", extra={"offset": ending_offset}
)
yield data, starting_offset, ending_offset, newline, event_expanded_offset
else:
ending_offset = range_start
def iterator_to_multiline_feed() -> FeedIterator:
for data, _, _, newline, _ in func(storage, range_start, body, is_gzipped):
assert isinstance(data, bytes)
yield data, newline
multiline_processor.feed = iterator_to_multiline_feed()
for multiline_data, multiline_ending_offset, newline in multiline_processor.collect():
starting_offset = ending_offset
ending_offset += multiline_ending_offset
shared_logger.debug("multi_line lines", extra={"offset": ending_offset})
yield multiline_data, starting_offset, ending_offset, newline, None
return wrapper