storage/s3.py (86 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
from io import SEEK_SET, BytesIO
from typing import Any, Optional
import boto3
import botocore.client
import elasticapm # noqa: F401
from botocore.response import StreamingBody
from share import ExpandEventListFromField, ProtocolMultiline, shared_logger
from .decorator import by_lines, inflate, json_collector, multi_line
from .storage import (
CHUNK_SIZE,
CommonStorage,
GetByLinesIterator,
StorageDecoratorIterator,
StorageReader,
is_gzip_content,
)
class S3Storage(CommonStorage):
"""
S3 Storage.
This class implements concrete S3 Storage
"""
_s3_client = boto3.client(
"s3", config=botocore.client.Config(retries={"total_max_attempts": 10, "mode": "standard"})
)
def __init__(
self,
bucket_name: str,
object_key: str,
json_content_type: Optional[str] = None,
multiline_processor: Optional[ProtocolMultiline] = None,
event_list_from_field_expander: Optional[ExpandEventListFromField] = None,
):
self._bucket_name: str = bucket_name
self._object_key: str = object_key
self.json_content_type = json_content_type
self.multiline_processor = multiline_processor
self.event_list_from_field_expander = event_list_from_field_expander
@multi_line
@json_collector
@by_lines
@inflate
def _generate(self, range_start: int, body: BytesIO, is_gzipped: bool) -> StorageDecoratorIterator:
"""
Concrete implementation of the iterator for get_by_lines
"""
file_ending_offset: int = range_start
def chunk_lambda() -> Any:
return body.read(CHUNK_SIZE)
if is_gzipped:
reader: StorageReader = StorageReader(raw=body)
yield reader, 0, 0, b"", None
else:
for chunk in iter(chunk_lambda, b""):
file_starting_offset = file_ending_offset
file_ending_offset += len(chunk)
shared_logger.debug("_generate flat", extra={"offset": file_ending_offset})
yield chunk, file_ending_offset, file_starting_offset, b"", None
def get_by_lines(self, range_start: int) -> GetByLinesIterator:
original_range_start: int = range_start
s3_object_head = self._s3_client.head_object(Bucket=self._bucket_name, Key=self._object_key)
content_type: str = s3_object_head["ContentType"]
content_length: int = s3_object_head["ContentLength"]
shared_logger.debug(
"get_by_lines",
extra={
"content_type": content_type,
"range_start": range_start,
"bucket_name": self._bucket_name,
"object_key": self._object_key,
},
)
file_content: BytesIO = BytesIO(b"")
self._s3_client.download_fileobj(self._bucket_name, self._object_key, file_content)
file_content.flush()
file_content.seek(0, SEEK_SET)
is_gzipped: bool = False
if is_gzip_content(file_content.readline()):
is_gzipped = True
range_start = 0
if range_start < content_length:
file_content.seek(range_start, SEEK_SET)
for log_event, line_starting_offset, line_ending_offset, _, event_expanded_offset in self._generate(
original_range_start, file_content, is_gzipped
):
assert isinstance(log_event, bytes)
yield log_event, line_starting_offset, line_ending_offset, event_expanded_offset
else:
shared_logger.info(f"requested file content from {range_start}, file size {content_length}: skip it")
def get_as_string(self) -> str:
shared_logger.debug("get_as_string", extra={"bucket_name": self._bucket_name, "object_key": self._object_key})
s3_object = self._s3_client.get_object(Bucket=self._bucket_name, Key=self._object_key, Range="bytes=0-")
body: StreamingBody = s3_object["Body"]
return str(body.read(s3_object["ContentLength"]).decode("utf-8"))