shippers/factory.py (58 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
from typing import Any, Callable
from share.config import ElasticsearchOutput, LogstashOutput, Output
from .es import ElasticsearchShipper
from .logstash import LogstashShipper
from .shipper import ProtocolShipper
_init_definition_by_output: dict[str, dict[str, Any]] = {
"elasticsearch": {
"class": ElasticsearchShipper,
},
"logstash": {
"class": LogstashShipper,
},
}
class ShipperFactory:
"""
Shipper factory.
Provides static methods to instantiate a shipper
"""
@staticmethod
def create_from_output(output_type: str, output: Output) -> ProtocolShipper:
"""
Instantiates a concrete Shipper given an output type and an Output instance
"""
if output_type == "elasticsearch":
if not isinstance(output, ElasticsearchOutput):
raise ValueError(f"output expected to be ElasticsearchOutput type, given {type(output)}")
return ShipperFactory.create(
output_type="elasticsearch",
elasticsearch_url=output.elasticsearch_url,
username=output.username,
password=output.password,
cloud_id=output.cloud_id,
api_key=output.api_key,
es_datastream_name=output.es_datastream_name,
tags=output.tags,
batch_max_actions=output.batch_max_actions,
batch_max_bytes=output.batch_max_bytes,
ssl_assert_fingerprint=output.ssl_assert_fingerprint,
es_dead_letter_index=output.es_dead_letter_index,
)
if output_type == "logstash":
if not isinstance(output, LogstashOutput):
raise ValueError(f"output expected to be LogstashOutput type, given {type(output)}")
return ShipperFactory.create(
output_type="logstash",
logstash_url=output.logstash_url,
username=output.username,
password=output.password,
max_batch_size=output.max_batch_size,
compression_level=output.compression_level,
ssl_assert_fingerprint=output.ssl_assert_fingerprint,
tags=output.tags,
)
raise ValueError(
f"You must provide one of the following outputs: " f"{', '.join(_init_definition_by_output.keys())}"
)
@staticmethod
def create(output_type: str, **kwargs: Any) -> ProtocolShipper:
"""
Instantiates a concrete Shipper given an output type and the shipper init kwargs
"""
if output_type not in _init_definition_by_output:
raise ValueError(
f"You must provide one of the following outputs: " f"{', '.join(_init_definition_by_output.keys())}"
)
output_definition = _init_definition_by_output[output_type]
output_builder: Callable[..., ProtocolShipper] = output_definition["class"]
return output_builder(**kwargs)