share/multiline.py (285 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # This file is a porting of the multiline processor on beats. from __future__ import annotations import datetime import re from abc import ABCMeta from typing import Callable, Iterator, Optional, Protocol from typing_extensions import TypeAlias default_max_bytes: int = 10485760 # Default maximum number of bytes to return in one multi-line event default_max_lines: int = 500 # Default maximum number of lines to return in one multi-line event default_multiline_timeout: int = 5 # Default timeout in secs to finish a multi-line event. timedelta_circuit_breaker: datetime.timedelta = datetime.timedelta(seconds=5) # CollectTuple is a tuple representing the multilines bytes content, the length of the content and the newline found # These data is instrumental to the `StorageDecoratorIterator` that needs the content to yield, the starting and ending # offsets and the newline CollectTuple: TypeAlias = tuple[bytes, int, bytes] # CollectIterator yields a `CollectTuple` CollectIterator: TypeAlias = Iterator[CollectTuple] # FeedIterator yields a tuple representing the content and its newline to feed of the `ProtocolMultiline` implementation FeedIterator: TypeAlias = Iterator[tuple[bytes, bytes]] class CommonMultiline(metaclass=ABCMeta): """ Common class for Multiline components """ _feed: FeedIterator _buffer: CollectBuffer _pre_collect_buffer: bool @property def feed(self) -> FeedIterator: return self._feed @feed.setter def feed(self, value: FeedIterator) -> None: self._feed = value class ProtocolMultiline(Protocol): """ Protocol class for Multiline components """ _feed: FeedIterator _buffer: CollectBuffer @property def feed(self) -> FeedIterator: pass # pragma: no cover @feed.setter def feed(self, value: FeedIterator) -> None: pass # pragma: no cover def collect(self) -> CollectIterator: pass # pragma: no cover class CollectBuffer: """ MessageBuffer. This class implements a buffer for collecting multiline content with criteria. """ def __init__(self, max_bytes: int, max_lines: int, skip_newline: bool): self._max_bytes: int = max_bytes self._max_lines: int = max_lines self._skip_newline: bool = skip_newline self._buffer: bytes = b"" self._previous: bytes = b"" self._previous_newline: bytes = b"" self._previous_was_empty: bool = False self._buffer_lines: int = 0 self._processed_lines: int = 0 self._current_length: int = 0 def collect_and_reset(self) -> CollectTuple: data = self._buffer current_length = self._current_length self._buffer = b"" self._current_length = 0 self._buffer_lines = 0 self._processed_lines = 0 self.previous = b"" if data.find(b"\r\n") > -1: newline = b"\r\n" elif data.find(b"\n") > -1: newline = b"\n" else: newline = b"" return data, current_length, newline def is_empty(self) -> bool: return self._buffer_lines == 0 @property def previous(self) -> bytes: return self._previous @previous.setter def previous(self, value: bytes) -> None: self._previous = value def grow(self, data: bytes, newline: bytes) -> None: add_newline: bool = False if (len(self._buffer) > 0 or self._previous_was_empty) and not self._skip_newline: self._previous_was_empty = False add_newline = True if len(data) == 0: self._previous_was_empty = True below_max_lines: bool = False if self._max_lines <= 0 or self._buffer_lines < self._max_lines: below_max_lines = True grow_size: int = self._max_bytes - len(self._buffer) below_max_bytes: bool = False if self._max_bytes <= 0 or grow_size > 0: below_max_bytes = True self._current_length += len(data) + len(newline) if below_max_lines and below_max_bytes: if grow_size < 0 or grow_size > len(data): grow_size = len(data) if add_newline: self._buffer += self._previous_newline self._buffer += data[:grow_size] self._buffer_lines += 1 self._previous_newline = newline self.previous = data self._processed_lines += 1 class CountMultiline(CommonMultiline): """ CountMultiline Multiline. This class implements concrete Count Multiline. """ def __init__( self, count_lines: int, max_bytes: int = default_max_bytes, max_lines: int = default_max_lines, skip_newline: bool = False, ): self._max_bytes: int = max_bytes self._max_lines: int = max_lines self._skip_newline: bool = skip_newline self._count_lines: int = count_lines self._current_count: int = 0 self._buffer: CollectBuffer = CollectBuffer(max_bytes, max_lines, skip_newline) def __eq__(self, other: object) -> bool: if not isinstance(other, CountMultiline): return False return ( self._count_lines == other._count_lines and self._max_bytes == other._max_bytes and self._max_lines == self._max_lines and self._skip_newline == self._skip_newline ) def collect(self) -> CollectIterator: last_iteration_datetime: datetime.datetime = datetime.datetime.now(datetime.UTC) for data, newline in self.feed: self._buffer.grow(data, newline) self._current_count += 1 if ( self._count_lines == self._current_count or (datetime.datetime.now(datetime.UTC) - last_iteration_datetime) > timedelta_circuit_breaker ): self._current_count = 0 yield self._buffer.collect_and_reset() if not self._buffer.is_empty(): yield self._buffer.collect_and_reset() # WhileMatcherCallable accepts a pattern in bytes to be compiled as regex. # It returns a boolean indicating if the content matches a "while" multiline pattern or not. WhileMatcherCallable = Callable[[bytes], bool] class WhileMultiline(CommonMultiline): """ WhileMultiline Multiline. This class implements concrete While Multiline. """ def __init__( self, pattern: str, negate: bool = False, max_bytes: int = default_max_bytes, max_lines: int = default_max_lines, skip_newline: bool = False, ): self._pattern: str = pattern self._negate: bool = negate self._max_bytes: int = max_bytes self._max_lines: int = max_lines self._skip_newline: bool = skip_newline self._matcher: WhileMatcherCallable = self._setup_pattern_matcher(pattern, negate) self._buffer: CollectBuffer = CollectBuffer(max_bytes, max_lines, skip_newline) self._pre_collect_buffer = True def __eq__(self, other: object) -> bool: if not isinstance(other, WhileMultiline): return False return ( self._pattern == other._pattern and self._negate == other._negate and self._max_bytes == other._max_bytes and self._max_lines == self._max_lines and self._skip_newline == self._skip_newline ) def _setup_pattern_matcher(self, pattern: str, negate: bool) -> WhileMatcherCallable: re_pattern: re.Pattern[bytes] = re.compile(pattern.encode("utf-8")) matcher: WhileMatcherCallable = self._get_pattern_matcher(re_pattern) if negate: matcher = self._negated_matcher(matcher) return matcher @staticmethod def _get_pattern_matcher(pattern: re.Pattern[bytes]) -> WhileMatcherCallable: def match(line: bytes) -> bool: return pattern.search(line) is not None return match @staticmethod def _negated_matcher(matcher: WhileMatcherCallable) -> WhileMatcherCallable: def negate(line: bytes) -> bool: return not matcher(line) return negate def collect(self) -> CollectIterator: last_iteration_datetime: datetime.datetime = datetime.datetime.now(datetime.UTC) for data, newline in self.feed: if not self._matcher(data): if self._buffer.is_empty(): self._buffer.grow(data, newline) yield self._buffer.collect_and_reset() else: content, current_length, _ = self._buffer.collect_and_reset() self._buffer.grow(data, newline) yield content, current_length, newline content, current_length, _ = self._buffer.collect_and_reset() yield content, current_length, newline else: self._buffer.grow(data, newline) # no pre collect buffer in while multiline, let's check the circuit breaker after at least one grow if (datetime.datetime.now(datetime.UTC) - last_iteration_datetime) > timedelta_circuit_breaker: yield self._buffer.collect_and_reset() if not self._buffer.is_empty(): yield self._buffer.collect_and_reset() # WhileMatcherCallable accepts the previous and the current content as arguments. # It returns a boolean indicating if the content matches a "pattern" multiline pattern or not. PatternMatcherCallable = Callable[[bytes, bytes], bool] # SelectCallable accepts the previous and the current content as arguments. # It returns either the previous or current content according to the matching type ("before" or "after"). SelectCallable = Callable[[bytes, bytes], bytes] class PatternMultiline(CommonMultiline): """ PatternMultiline Multiline. This class implements concrete Pattern Multiline. """ def __init__( self, pattern: str, match: str, negate: bool = False, flush_pattern: str = "", max_bytes: int = default_max_bytes, max_lines: int = default_max_lines, skip_newline: bool = False, ): self._pattern: str = pattern self._match: str = match self._negate: bool = negate self._max_bytes: int = max_bytes self._max_lines: int = max_lines self._skip_newline: bool = skip_newline self._matcher: PatternMatcherCallable = self._setup_pattern_matcher(pattern, match, negate) if flush_pattern: self._flush_pattern: Optional[re.Pattern[bytes]] = re.compile(flush_pattern.encode("utf-8")) else: self._flush_pattern = None self._buffer: CollectBuffer = CollectBuffer(max_bytes, max_lines, skip_newline) self._pre_collect_buffer: bool = True def __eq__(self, other: object) -> bool: if not isinstance(other, PatternMultiline): return False return ( self._pattern == other._pattern and self._match == other._match and self._negate == other._negate and self._max_bytes == other._max_bytes and self._max_lines == self._max_lines and self._skip_newline == self._skip_newline and self._flush_pattern == self._flush_pattern ) def _setup_pattern_matcher(self, pattern: str, match: str, negate: bool) -> PatternMatcherCallable: re_pattern: re.Pattern[bytes] = re.compile(pattern.encode("utf-8")) selector: Optional[SelectCallable] = None if match == "before": selector = self._before_matcher else: selector = self._after_matcher assert selector is not None matcher: PatternMatcherCallable = self._get_pattern_matcher(re_pattern, selector) if negate: matcher = self._negated_matcher(matcher) return matcher @staticmethod def _get_pattern_matcher(pattern: re.Pattern[bytes], selector: SelectCallable) -> PatternMatcherCallable: def match(previous: bytes, current: bytes) -> bool: line: bytes = selector(previous, current) return pattern.search(line) is not None return match @staticmethod def _before_matcher(previous: bytes, _: bytes) -> bytes: return previous @staticmethod def _after_matcher(_: bytes, current: bytes) -> bytes: return current @staticmethod def _negated_matcher(matcher: PatternMatcherCallable) -> PatternMatcherCallable: def negate(previous: bytes, current: bytes) -> bool: return not matcher(previous, current) return negate def _check_matcher(self) -> bool: return (self._match == "after" and len(self._buffer.previous) > 0) or self._match == "before" def collect(self) -> CollectIterator: for data, newline in self.feed: last_iteration_datetime: datetime.datetime = datetime.datetime.now(datetime.UTC) if self._pre_collect_buffer: self._buffer.collect_and_reset() self._buffer.grow(data, newline) self._pre_collect_buffer = False elif self._flush_pattern and self._flush_pattern.search(data) is not None: self._buffer.grow(data, newline) self._pre_collect_buffer = True yield self._buffer.collect_and_reset() elif ( not self._buffer.is_empty() and self._check_matcher() and not self._matcher(self._buffer.previous, data) ): content, current_length, _ = self._buffer.collect_and_reset() self._buffer.grow(data, newline) yield content, current_length, newline else: if (datetime.datetime.now(datetime.UTC) - last_iteration_datetime) > timedelta_circuit_breaker: yield self._buffer.collect_and_reset() self._buffer.grow(data, newline) if not self._buffer.is_empty(): yield self._buffer.collect_and_reset()