storage/decorator.py (245 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
import gzip
from io import BytesIO
from typing import Any, Iterator, Optional, Union
from share import ExpandEventListFromField, FeedIterator, ProtocolMultiline, json_parser, shared_logger
from .storage import CHUNK_SIZE, ProtocolStorageType, StorageDecoratorCallable, StorageDecoratorIterator, StorageReader
def by_lines(func: StorageDecoratorCallable[ProtocolStorageType]) -> StorageDecoratorCallable[ProtocolStorageType]:
"""
ProtocolStorage decorator for returning content split by lines
"""
def wrapper(
storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
) -> StorageDecoratorIterator:
ending_offset: int = range_start
unfinished_line: bytes = b""
iterator = func(storage, range_start, body, is_gzipped)
for data, _, _, _, _ in iterator:
assert isinstance(data, bytes)
unfinished_line += data
lines = unfinished_line.decode("utf-8").splitlines()
if len(lines) == 0:
continue
if unfinished_line.find(b"\r\n") > -1:
newline = b"\r\n"
elif unfinished_line.find(b"\n") > -1:
newline = b"\n"
else:
newline = b""
# replace unfinished_line with the last element removed from lines, trailing with newline
if unfinished_line.endswith(newline):
unfinished_line = lines.pop().encode() + newline
else:
unfinished_line = lines.pop().encode()
for line in lines:
line_encoded = line.encode("utf-8")
starting_offset = ending_offset
ending_offset += len(line_encoded) + len(newline)
shared_logger.debug("by_line lines", extra={"offset": ending_offset})
yield line_encoded, starting_offset, ending_offset, newline, None
if len(unfinished_line) > 0:
if unfinished_line.endswith(b"\r\n"):
newline = b"\r\n"
elif unfinished_line.endswith(b"\n"):
newline = b"\n"
else:
newline = b""
unfinished_line = unfinished_line.rstrip(newline)
starting_offset = ending_offset
ending_offset += len(unfinished_line) + len(newline)
shared_logger.debug("by_line unfinished_line", extra={"offset": ending_offset})
yield unfinished_line, starting_offset, ending_offset, newline, None
return wrapper
def multi_line(func: StorageDecoratorCallable[ProtocolStorageType]) -> StorageDecoratorCallable[ProtocolStorageType]:
"""
ProtocolStorage decorator for returning content collected by multiline
"""
def wrapper(
storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
) -> StorageDecoratorIterator:
multiline_processor: Optional[ProtocolMultiline] = storage.multiline_processor
if not multiline_processor:
iterator = func(storage, range_start, body, is_gzipped)
for data, starting_offset, ending_offset, newline, event_expanded_offset in iterator:
assert isinstance(data, bytes)
shared_logger.debug(
"no multi_line processor configured, processing as single line", extra={"offset": ending_offset}
)
yield data, starting_offset, ending_offset, newline, event_expanded_offset
else:
ending_offset = range_start
def iterator_to_multiline_feed() -> FeedIterator:
for data, _, _, newline, _ in func(storage, range_start, body, is_gzipped):
assert isinstance(data, bytes)
yield data, newline
multiline_processor.feed = iterator_to_multiline_feed()
for multiline_data, multiline_ending_offset, newline in multiline_processor.collect():
starting_offset = ending_offset
ending_offset += multiline_ending_offset
shared_logger.debug("multi_line lines", extra={"offset": ending_offset})
yield multiline_data, starting_offset, ending_offset, newline, None
return wrapper
class JsonCollectorState:
def __init__(self, storage: ProtocolStorageType):
self.storage: ProtocolStorageType = storage
self.starting_offset: int = 0
self.ending_offset: int = 0
self.unfinished_line: bytes = b""
self.has_an_object_start: bool = False
self.is_a_json_object: bool = False
self.is_a_json_object_circuit_broken: bool = False
self.is_a_json_object_circuit_breaker: int = 0
def json_collector(
func: StorageDecoratorCallable[ProtocolStorageType],
) -> StorageDecoratorCallable[ProtocolStorageType]:
"""
ProtocolStorage decorator for returning content by collected json object (if any) spanning multiple lines
If `json_content_type` is `single` and we don't have any `expand_event_list_from_field` set, we just collect all the
content, and yield. If `json_content_type` is `disabled` or we have a multiline processor set, we yield what we
receive from the previous decorator. If `json_content_type` is `None` or `ndjson` we try to parse che content as
json if we find the beginning of a potential json object (ie: the `{` char). This is done appending one line after
one line and passing the content to the json parser until it will be able to parse a full json object. If the
content is ndjson every json parsing attempt will be successful, in case it isn't the parsing will succeed only
after we collect a full json object spanning multiple lines. A circuit breaker is present in order to stop trying to
parse the json if we reached 1000 lines: in this case we yield the content as it is, line by line.
Once a json object is parsed, if we have an `expand_event_list_from_field` set we pass the json the "events list
from field expander" and yield the expanded events list instead.
"""
def _handle_offset(offset_skew: int, json_collector_state: JsonCollectorState) -> None:
json_collector_state.starting_offset = json_collector_state.ending_offset
json_collector_state.ending_offset += offset_skew
def _collector(
data: bytes, newline: bytes, json_collector_state: JsonCollectorState
) -> Iterator[tuple[bytes, Optional[dict[str, Any]]]]:
try:
# let's buffer the content
# we receive data without newline
# let's append it as well
json_collector_state.unfinished_line += data + newline
# let's try to decode
json_object = json_parser(json_collector_state.unfinished_line)
# it didn't raise: we collected a json object
data_to_yield = json_collector_state.unfinished_line
# let's reset the buffer
json_collector_state.unfinished_line = b""
# let's increase the offset for yielding
_handle_offset(len(data_to_yield), json_collector_state)
# let's decrease the circuit breaker by the number of lines in the data to yield
if newline != b"":
json_collector_state.is_a_json_object_circuit_breaker -= data_to_yield.count(newline) - 1
else:
json_collector_state.is_a_json_object_circuit_breaker -= 1
# let's trim surrounding newline
data_to_yield = data_to_yield.strip(b"\r\n").strip(b"\n")
# let's set the flag for json object
json_collector_state.is_a_json_object = True
# finally yield
yield data_to_yield, json_object
# it raised ValueError: we didn't collect enough content
# to reach the end of the json object
# let's keep iterating
except ValueError:
# it's an empty line, let's yield it
if (
json_collector_state.is_a_json_object
and len(json_collector_state.unfinished_line.strip(b"\r\n").strip(b"\n")) == 0
):
# let's reset the buffer
json_collector_state.unfinished_line = b""
# let's increase the offset for yielding
_handle_offset(len(newline), json_collector_state)
# finally yield
yield b"", None
else:
# buffer was not a complete json object
# let's increase the circuit breaker
json_collector_state.is_a_json_object_circuit_breaker += 1
# if the first 1k lines are not a json object let's give up
if json_collector_state.is_a_json_object_circuit_breaker > 1000:
json_collector_state.is_a_json_object_circuit_broken = True
def _by_lines_fallback(json_collector_state: JsonCollectorState) -> StorageDecoratorIterator:
@by_lines
def wrapper(
storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
) -> StorageDecoratorIterator:
data_to_yield: bytes = body.read()
yield data_to_yield, 0, range_start, b"", None
for line, _, _, newline, _ in wrapper(
json_collector_state.storage,
json_collector_state.ending_offset,
BytesIO(json_collector_state.unfinished_line),
False,
):
assert isinstance(line, bytes)
_handle_offset(len(line) + len(newline), json_collector_state)
# let's reset the buffer
json_collector_state.unfinished_line = b""
# let's set the flag for direct yield from now on
json_collector_state.has_an_object_start = False
yield line, _, _, newline, None
def _collect_single(iterator: StorageDecoratorIterator) -> StorageDecoratorIterator:
# we get the original iterator, we collect everything in a list that we merge later and extract values from
single: list[tuple[Union[StorageReader, bytes], int, int, bytes]] = list(
[
(data, starting_offset, ending_offset, newline)
for data, starting_offset, ending_offset, newline, _ in iterator
]
)
newline = single[0][-1]
starting_offset = single[0][1]
ending_offset = single[-1][2]
data_to_yield: bytes = newline.join([x[0] for x in single])
yield data_to_yield, starting_offset, ending_offset, newline, None
def wrapper(
storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
) -> StorageDecoratorIterator:
json_collector_state = JsonCollectorState(storage=storage)
multiline_processor: Optional[ProtocolMultiline] = storage.multiline_processor
if storage.json_content_type == "disabled" or multiline_processor:
iterator = func(storage, range_start, body, is_gzipped)
for data, starting_offset, ending_offset, newline, _ in iterator:
assert isinstance(data, bytes)
shared_logger.debug("json_collector skipped", extra={"offset": ending_offset})
yield data, starting_offset, ending_offset, newline, None
else:
event_list_from_field_expander: Optional[ExpandEventListFromField] = storage.event_list_from_field_expander
json_collector_state.ending_offset = range_start
iterator = func(storage, range_start, body, is_gzipped)
# if we know it's a single json we wrap the iterator with _collect_single
# and mark the object as json and started
if storage.json_content_type == "single":
iterator = _collect_single(iterator=iterator)
json_collector_state.is_a_json_object = True
json_collector_state.has_an_object_start = True
for data, starting_offset, ending_offset, newline, _ in iterator:
assert isinstance(data, bytes)
# let's wait for the start of a json object
if not json_collector_state.has_an_object_start:
# if range_start is greater than zero, or we have leading space, data can be empty
stripped_data = data.decode("utf-8").lstrip()
if len(stripped_data) > 0 and stripped_data[0] == "{":
# we mark the potentiality of a json object start
# CAVEAT: if the log entry starts with `{` but the
# content is not json, we buffer the first 10k lines
# before the circuit breaker kicks in
json_collector_state.has_an_object_start = True
# if it has not a json object start we can just forward the content by lines
if not json_collector_state.has_an_object_start:
_handle_offset(len(data) + len(newline), json_collector_state)
yield data, starting_offset, ending_offset, newline, None
# it has a json object start, let's apply our logic
if json_collector_state.has_an_object_start:
# it is a single json and we have not a field expander, let's yield the content
if event_list_from_field_expander is None and storage.json_content_type == "single":
yield data, starting_offset, ending_offset, newline, None
else:
# it is not single, or we have a field expander. let's try to collect the data as json
for data_to_yield, json_object in _collector(data, newline, json_collector_state):
shared_logger.debug(
"json_collector objects", extra={"offset": json_collector_state.ending_offset}
)
# we have a field expander, let's yield the expansion
if event_list_from_field_expander is not None:
for (
expanded_log_event,
expanded_starting_offset,
expanded_ending_offset,
expanded_event_n,
) in event_list_from_field_expander.expand(
data_to_yield,
json_object,
json_collector_state.starting_offset,
json_collector_state.ending_offset,
):
yield (
expanded_log_event,
expanded_starting_offset,
expanded_ending_offset,
newline,
expanded_event_n,
)
else:
# we do not have a field expander, let's yield the expansion
yield (
data_to_yield,
json_collector_state.starting_offset,
json_collector_state.ending_offset,
newline,
None,
)
del json_object
# check if we hit the circuit broken
if json_collector_state.is_a_json_object_circuit_broken:
# let's yield what we have so far
for line, _, _, original_newline, _ in _by_lines_fallback(json_collector_state):
yield (
line,
json_collector_state.starting_offset,
json_collector_state.ending_offset,
original_newline,
None,
)
# in this case we could have a trailing new line in what's left in the buffer
# or the content had a leading `{` but was not a json object before the circuit breaker intercepted it,
# or we waited for the object start and never reached:
# let's fallback to by_lines()
if not json_collector_state.is_a_json_object:
for line, _, _, newline, _ in _by_lines_fallback(json_collector_state):
yield line, json_collector_state.starting_offset, json_collector_state.ending_offset, newline, None
return wrapper
def inflate(func: StorageDecoratorCallable[ProtocolStorageType]) -> StorageDecoratorCallable[ProtocolStorageType]:
"""
ProtocolStorage decorator for returning inflated content in case the original is gzipped
"""
def wrapper(
storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
) -> StorageDecoratorIterator:
iterator = func(storage, range_start, body, is_gzipped)
for data, _, _, _, _ in iterator:
if is_gzipped:
gzip_stream = gzip.GzipFile(fileobj=data) # type:ignore
gzip_stream.seek(range_start)
while True:
inflated_chunk: bytes = gzip_stream.read(CHUNK_SIZE)
if len(inflated_chunk) == 0:
break
buffer: BytesIO = BytesIO()
buffer.write(inflated_chunk)
shared_logger.debug("inflate inflate")
yield buffer.getvalue(), 0, 0, b"", None
else:
shared_logger.debug("inflate plain")
yield data, 0, 0, b"", None
return wrapper