storage/payload.py (90 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
import base64
import binascii
import gzip
from io import SEEK_SET, BytesIO
from typing import Any, Optional
from share import ExpandEventListFromField, ProtocolMultiline, shared_logger
from .decorator import by_lines, inflate, json_collector, multi_line
from .storage import (
CHUNK_SIZE,
CommonStorage,
GetByLinesIterator,
StorageDecoratorIterator,
StorageReader,
is_gzip_content,
)
class PayloadStorage(CommonStorage):
"""
PayloadStorage Storage.
This class implements concrete Payload Storage.
The payload might be base64 and gzip encoded
"""
def __init__(
self,
payload: str,
json_content_type: Optional[str] = None,
multiline_processor: Optional[ProtocolMultiline] = None,
event_list_from_field_expander: Optional[ExpandEventListFromField] = None,
):
self._payload: str = payload
self.json_content_type = json_content_type
self.multiline_processor = multiline_processor
self.event_list_from_field_expander = event_list_from_field_expander
@multi_line
@json_collector
@by_lines
@inflate
def _generate(self, range_start: int, body: BytesIO, is_gzipped: bool) -> StorageDecoratorIterator:
"""
Concrete implementation of the iterator for get_by_lines
"""
file_ending_offset: int = range_start
def chunk_lambda() -> Any:
return body.read(CHUNK_SIZE)
if is_gzipped:
reader: StorageReader = StorageReader(raw=body)
yield reader, 0, 0, b"", None
else:
for chunk in iter(chunk_lambda, b""):
file_starting_offset = file_ending_offset
file_ending_offset += len(chunk)
shared_logger.debug("_generate flat", extra={"offset": file_ending_offset})
yield chunk, file_starting_offset, file_ending_offset, b"", None
def get_by_lines(self, range_start: int) -> GetByLinesIterator:
original_range_start: int = range_start
is_gzipped: bool = False
is_b64encoded: bool = False
try:
base64_decoded = base64.b64decode(self._payload, validate=True)
# we try to unicode decode to catch if `base64.b64decode` decoded to non-valid unicode:
# in this case `UnicodeDecodeError` will be thrown, this mean that the original was not base64 encoded
# we try this only if it's not gzipped, because in that case `UnicodeDecodeError` will be thrown anyway
if not is_gzip_content(base64_decoded):
base64_decoded.decode("utf-8")
# if `UnicodeDecodeError` was thrown, the content was not base64 encoded
# and the below assignment will not be executed
is_b64encoded = True
else:
# we have gzip content that was base64 encoded
# let's do the proper assignment
is_b64encoded = True
except (UnicodeDecodeError, ValueError, binascii.Error):
# it was not valid unicode base64 encoded value or is it bare gzip content
# just take as it is and encode to unicode bytes
base64_decoded = self._payload.encode("utf-8")
if is_gzip_content(base64_decoded):
is_gzipped = True
range_start = 0
shared_logger.debug(
"get_by_lines",
extra={
"range_start": original_range_start,
"is_b64encoded": is_b64encoded,
"is_gzipped": is_gzipped,
},
)
content_length = len(base64_decoded)
if range_start < content_length:
file_content: BytesIO = BytesIO(base64_decoded)
file_content.flush()
file_content.seek(range_start, SEEK_SET)
for log_event, line_starting_offset, line_ending_offset, _, event_expanded_offset in self._generate(
original_range_start, file_content, is_gzipped
):
assert isinstance(log_event, bytes)
yield log_event, line_starting_offset, line_ending_offset, event_expanded_offset
else:
shared_logger.info(f"requested payload content from {range_start}, payload size {content_length}: skip it")
def get_as_string(self) -> str:
try:
base64_decoded = base64.b64decode(self._payload, validate=True)
if not is_gzip_content(base64_decoded):
base64_decoded.decode("utf-8")
except (UnicodeDecodeError, ValueError, binascii.Error):
base64_decoded = self._payload.encode("utf-8")
if is_gzip_content(base64_decoded):
return gzip.decompress(base64_decoded).decode("utf-8")
return base64_decoded.decode("utf-8")