storage/factory.py (36 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
from typing import Any, Callable, Optional
from share import ExpandEventListFromField, ProtocolMultiline, json_dumper
from .payload import PayloadStorage
from .s3 import S3Storage
from .storage import ProtocolStorage
_init_definition_by_storage_type: dict[str, dict[str, Any]] = {
"s3": {"class": S3Storage, "kwargs": ["bucket_name", "object_key"]},
"payload": {"class": PayloadStorage, "kwargs": ["payload"]},
}
class StorageFactory:
"""
Storage factory.
Provides static methods to instantiate a Storage
"""
@staticmethod
def create(
storage_type: str,
json_content_type: Optional[str] = None,
event_list_from_field_expander: Optional[ExpandEventListFromField] = None,
multiline_processor: Optional[ProtocolMultiline] = None,
**kwargs: Any,
) -> ProtocolStorage:
"""
Instantiates a concrete Storage given its type and the storage init kwargs
"""
if storage_type not in _init_definition_by_storage_type:
raise ValueError(
"You must provide one of the following storage types: "
+ f"{', '.join(_init_definition_by_storage_type.keys())}"
)
storage_definition = _init_definition_by_storage_type[storage_type]
storage_kwargs = storage_definition["kwargs"]
init_kwargs: list[str] = [key for key in kwargs.keys() if key in storage_kwargs and kwargs[key]]
if len(init_kwargs) != len(storage_kwargs):
raise ValueError(
f"You must provide the following not empty init kwargs for {storage_type}: "
+ f"{', '.join(storage_kwargs)}. (provided: {json_dumper(kwargs)})"
)
kwargs["json_content_type"] = json_content_type
kwargs["multiline_processor"] = multiline_processor
kwargs["event_list_from_field_expander"] = event_list_from_field_expander
storage_builder: Callable[..., ProtocolStorage] = storage_definition["class"]
return storage_builder(**kwargs)