storage/storage.py (29 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. from abc import ABCMeta from io import BytesIO from typing import Any, Callable, Iterator, Optional, Protocol, TypeVar, Union from typing_extensions import TypeAlias from share import ExpandEventListFromField, ProtocolMultiline # CHUNK_SIZE is how much we read from the gzip stream at every iteration in the inflate decorator # BEWARE, this CHUNK_SIZE has a huge impact on performance, contrary to what we stated here: # https://github.com/elastic/elastic-serverless-forwarder/pull/11#discussion_r732587976 # Reinstating to 1M from 1K resulted on 6.2M gzip of 35.1 of inflated content # to be ingested in 45 secs instead of having the lambda timing out CHUNK_SIZE: int = 1024**2 def is_gzip_content(content: bytes) -> bool: return content.startswith(b"\037\213") # gzip compression method class StorageReader: """ StorageReader is an interface for contents returned by storage. It wraps the underlying type and forward to it """ def __init__(self, raw: Any): self._raw = raw def __getattr__(self, item: str) -> Any: return getattr(self._raw, item) # GetByLinesIterator yields a tuple of content, starting offset, ending offset # and optional offset of a list of expanded events GetByLinesIterator: TypeAlias = Iterator[tuple[bytes, int, int, Optional[int]]] class ProtocolStorage(Protocol): """ Protocol for Storage components """ json_content_type: Optional[str] multiline_processor: Optional[ProtocolMultiline] event_list_from_field_expander: Optional[ExpandEventListFromField] def get_by_lines(self, range_start: int) -> GetByLinesIterator: pass # pragma: no cover def get_as_string(self) -> str: pass # pragma: no cover class CommonStorage(metaclass=ABCMeta): """ Common class for Storage components """ json_content_type: Optional[str] = None multiline_processor: Optional[ProtocolMultiline] = None event_list_from_field_expander: Optional[ExpandEventListFromField] = None ProtocolStorageType = TypeVar("ProtocolStorageType", bound=ProtocolStorage) # StorageDecoratorIterator yields a tuple of content (expressed as `StorageReader` or bytes), starting offset, # ending offset, newline and optional offset of a list of expanded events StorageDecoratorIterator: TypeAlias = Iterator[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] # StorageDecoratorCallable accepts a `ProtocolStorageType`, the range start offset, the content as BytesIO and a boolean # flag indicating if the content is gzipped as arguments. It returns a `StorageDecoratorIterator` StorageDecoratorCallable = Callable[[ProtocolStorageType, int, BytesIO, bool], StorageDecoratorIterator]