share/config.py (419 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
from typing import Any, Callable, Optional, Union
import yaml
from .factory import MultilineFactory
from .include_exlude import IncludeExcludeFilter, IncludeExcludeRule
from .logger import logger as shared_logger
from .multiline import ProtocolMultiline
_available_input_types: list[str] = ["cloudwatch-logs", "s3-sqs", "sqs", "kinesis-data-stream"]
_available_output_types: list[str] = ["elasticsearch", "logstash"]
class Output:
"""
Base class for Output component
"""
def __init__(self, output_type: str):
self.type: str = output_type
@property
def type(self) -> str:
return self._type
@type.setter
def type(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`type` must be provided as string")
if value not in _available_output_types:
raise ValueError(f"`type` must be one of {','.join(_available_output_types)}: {value} given")
self._type = value
class ElasticsearchOutput(Output):
def __init__(
self,
elasticsearch_url: str = "",
cloud_id: str = "",
username: str = "",
password: str = "",
api_key: str = "",
es_datastream_name: str = "",
tags: list[str] = [],
batch_max_actions: int = 500,
batch_max_bytes: int = 10 * 1024 * 1024,
ssl_assert_fingerprint: str = "",
es_dead_letter_index: str = "",
):
super().__init__(output_type="elasticsearch")
self.elasticsearch_url = elasticsearch_url
self.cloud_id = cloud_id
self.username = username
self.password = password
self.api_key = api_key
self.es_datastream_name = es_datastream_name
self.tags = tags
self.batch_max_actions = batch_max_actions
self.batch_max_bytes = batch_max_bytes
self.ssl_assert_fingerprint = ssl_assert_fingerprint
self.es_dead_letter_index = es_dead_letter_index
if self.cloud_id and self.elasticsearch_url:
shared_logger.warning("both `elasticsearch_url` and `cloud_id` set in config: using `elasticsearch_url`")
self.cloud_id = ""
if not self.username and not self.api_key:
raise ValueError("One between `username` and `password`, or `api_key` must be set")
if self.username and self.api_key:
shared_logger.warning("both `api_key` and `username` and `password` set in config: using `api_key`")
self._username = ""
self._password = ""
if self.username and not self.password:
raise ValueError("`password` must be set when using `username`")
if not self.es_datastream_name:
shared_logger.debug("no `es_datastream_name` set in config")
shared_logger.debug("tags: ", extra={"tags": self.tags})
@property
def elasticsearch_url(self) -> str:
return self._elasticsearch_url
@elasticsearch_url.setter
def elasticsearch_url(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`elasticsearch_url` must be provided as string")
self._elasticsearch_url = value
@property
def username(self) -> str:
return self._username
@username.setter
def username(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`username` must be provided as string")
self._username = value
@property
def password(self) -> str:
return self._password
@password.setter
def password(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`password` must be provided as string")
self._password = value
@property
def cloud_id(self) -> str:
return self._cloud_id
@cloud_id.setter
def cloud_id(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`cloud_id` must be provided as string")
self._cloud_id = value
@property
def api_key(self) -> str:
return self._api_key
@api_key.setter
def api_key(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`api_key` must be provided as string")
self._api_key = value
@property
def es_datastream_name(self) -> str:
return self._es_datastream_name
@es_datastream_name.setter
def es_datastream_name(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`es_datastream_name` must be provided as string")
self._es_datastream_name = value
@property
def batch_max_actions(self) -> int:
return self._batch_max_actions
@batch_max_actions.setter
def batch_max_actions(self, value: int) -> None:
if not isinstance(value, int):
raise ValueError("`batch_max_actions` must be provided as integer")
self._batch_max_actions = value
@property
def batch_max_bytes(self) -> int:
return self._batch_max_bytes
@batch_max_bytes.setter
def batch_max_bytes(self, value: int) -> None:
if not isinstance(value, int):
raise ValueError("`batch_max_bytes` must be provided as integer")
self._batch_max_bytes = value
@property
def ssl_assert_fingerprint(self) -> str:
return self._ssl_assert_fingerprint
@ssl_assert_fingerprint.setter
def ssl_assert_fingerprint(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`ssl_assert_fingerprint` must be provided as string")
self._ssl_assert_fingerprint = value
@property
def es_dead_letter_index(self) -> str:
return self._es_dead_letter_index
@es_dead_letter_index.setter
def es_dead_letter_index(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`es_dead_letter_index` must be provided as string")
self._es_dead_letter_index = value
class LogstashOutput(Output):
def __init__(
self,
logstash_url: str = "",
username: str = "",
password: str = "",
max_batch_size: int = 500,
compression_level: int = 9,
tags: list[str] = [],
ssl_assert_fingerprint: str = "",
) -> None:
super().__init__(output_type="logstash")
self.logstash_url = logstash_url
self.username = username
self.password = password
self.max_batch_size = max_batch_size
self.compression_level = compression_level
self.tags = tags
self.ssl_assert_fingerprint = ssl_assert_fingerprint
if self.username and not self.password:
raise ValueError("`password` must be set when using `username`")
shared_logger.debug("tags: ", extra={"tags": self.tags})
@property
def logstash_url(self) -> str:
return self._logstash_url
@logstash_url.setter
def logstash_url(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`logstash_url` must be provided as string")
self._logstash_url = value
@property
def username(self) -> str:
return self._username
@username.setter
def username(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`username` must be provided as string")
self._username = value
@property
def password(self) -> str:
return self._password
@password.setter
def password(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`password` must be provided as string")
self._password = value
@property
def max_batch_size(self) -> int:
return self._max_batch_size
@max_batch_size.setter
def max_batch_size(self, value: int) -> None:
if not isinstance(value, int):
raise ValueError("`max_batch_size` must be provided as int")
self._max_batch_size = value
@property
def compression_level(self) -> int:
return self._compression_level
@compression_level.setter
def compression_level(self, value: int) -> None:
if not isinstance(value, int):
raise ValueError("`compression_level` must be provided as int")
self._compression_level = value
@property
def ssl_assert_fingerprint(self) -> str:
return self._ssl_assert_fingerprint
@ssl_assert_fingerprint.setter
def ssl_assert_fingerprint(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`ssl_assert_fingerprint` must be provided as string")
self._ssl_assert_fingerprint = value
class Input:
"""
Base class for Input component
"""
def __init__(self, input_type: str, input_id: str):
self.id = input_id
self.type = input_type
self._tags: list[str] = []
self._json_content_type: str = ""
self._expand_event_list_from_field: str = ""
self._root_fields_to_add_to_expanded_event: Optional[Union[str, list[str]]] = None
self._outputs: dict[str, Output] = {}
self._multiline_processor: Optional[ProtocolMultiline] = None
self._include_exclude_filter: Optional[IncludeExcludeFilter] = None
self._valid_json_content_type: list[str] = ["ndjson", "single", "disabled"]
@property
def type(self) -> str:
return self._type
@type.setter
def type(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`type` must be provided as string")
if value not in _available_input_types:
raise ValueError(f"`type` must be one of {','.join(_available_input_types)}: {value} given")
self._type = value
@property
def id(self) -> str:
return self._id
@id.setter
def id(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`id` must be provided as string")
self._id = value
@property
def tags(self) -> list[str]:
"""
Tags getter.
Returns all tags
"""
return self._tags
@tags.setter
def tags(self, values: list[str]) -> None:
"""
Tags setter.
It receives a list of tags and performs type validation
"""
if not isinstance(values, list):
raise ValueError(f"`tags` must be provided as list for input {self.id}")
self._tags = [value for value in values if isinstance(value, str)]
if len(self._tags) != len(values):
raise ValueError(f"Each tag in `tags` must be provided as string for input {self.id}, given: {values}")
@property
def expand_event_list_from_field(self) -> str:
return self._expand_event_list_from_field
@expand_event_list_from_field.setter
def expand_event_list_from_field(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError(f"`expand_event_list_from_field` must be provided as string for input {self.id}")
self._expand_event_list_from_field = value
@property
def root_fields_to_add_to_expanded_event(self) -> Optional[Union[str, list[str]]]:
return self._root_fields_to_add_to_expanded_event
@root_fields_to_add_to_expanded_event.setter
def root_fields_to_add_to_expanded_event(self, value: Union[str, list[str]]) -> None:
if isinstance(value, str) and value == "all":
self._root_fields_to_add_to_expanded_event = value
return
if isinstance(value, list):
self._root_fields_to_add_to_expanded_event = value
return
raise ValueError("`root_fields_to_add_to_expanded_event` must be provided as `all` or a list of strings")
@property
def json_content_type(self) -> str:
return self._json_content_type
@json_content_type.setter
def json_content_type(self, value: str) -> None:
if value not in self._valid_json_content_type:
raise ValueError(
f"`json_content_type` must be one of {','.join(self._valid_json_content_type)} "
f"for input {self.id}: {value} given"
)
self._json_content_type = value
@property
def include_exclude_filter(self) -> Optional[IncludeExcludeFilter]:
return self._include_exclude_filter
@include_exclude_filter.setter
def include_exclude_filter(self, value: IncludeExcludeFilter) -> None:
if not isinstance(value, IncludeExcludeFilter):
raise ValueError(f"An error occurred while setting include and exclude filter for input {self.id}")
self._include_exclude_filter = value
def get_output_by_destination(self, output_destination: str) -> Optional[Output]:
"""
Output getter.
Returns a specific output given its destination
"""
return self._outputs[output_destination] if output_destination in self._outputs else None
def get_output_destinations(self) -> list[str]:
"""
Output destinations getter.
Returns all the defined output destinations
"""
return list(self._outputs.keys())
def delete_output_by_destination(self, output_destination: str) -> None:
"""
Output deleter.
Delete a defined output by its type
"""
del self._outputs[output_destination]
def add_output(self, output_type: str, **kwargs: Any) -> None:
"""
Output setter.
Set an output given its type and init kwargs
"""
if output_type not in _available_output_types:
raise ValueError(f"Type {output_type} is not included in the supported types: {_available_output_types}")
output_dest = ""
if output_type == "elasticsearch":
if "cloud_id" not in kwargs and "elasticsearch_url" not in kwargs:
raise ValueError("Either `elasticsearch_url` or `cloud_id` must be set")
# elasticsearch_url takes precedence over cloud_id
if "elasticsearch_url" not in kwargs:
output_dest = kwargs["cloud_id"]
else:
output_dest = kwargs["elasticsearch_url"]
elif output_type == "logstash":
if "logstash_url" not in kwargs:
raise ValueError(f"Output type {output_type} requires logstash_url to be set")
output_dest = kwargs["logstash_url"]
if output_dest in self._outputs:
# Since logstash destination can only be set as logstash_url, we do not have to account
# for the same url/cloud_id for both types logstash or elasticsearch
raise ValueError(f"Duplicated output destination {output_dest} for type {output_type}")
output: Optional[Output] = None
if output_type == "elasticsearch":
output = ElasticsearchOutput(**kwargs)
elif output_type == "logstash":
output = LogstashOutput(**kwargs)
else:
output = Output(output_type=output_type)
self._outputs[output_dest] = output
def get_multiline_processor(self) -> Optional[ProtocolMultiline]:
return self._multiline_processor
def add_multiline_processor(self, multiline_type: str, **kwargs: Any) -> None:
"""
Multiline setter.
Set a multiline processor given its type and init kwargs
"""
self._multiline_processor = MultilineFactory.create(multiline_type=multiline_type, **kwargs)
class Config:
"""
Config component
"""
def __init__(self) -> None:
self._inputs: dict[str, Input] = {}
def get_input_by_id(self, input_id: str) -> Optional[Input]:
"""
Input getter.
Returns a specific input given its id
"""
if input_id in self._inputs:
return self._inputs[input_id]
return None
def add_input(self, new_input: Input) -> None:
"""
Input setter.
Set an input.
"""
if new_input.id in self._inputs:
raise ValueError(f"Duplicated input with id {new_input.id}")
self._inputs[new_input.id] = new_input
def parse_config(config_yaml: str, expanders: list[Callable[[str], str]] = []) -> Config:
"""
Config component factory
Given a config yaml as string it return the Config instance as defined by the yaml
"""
for expander in expanders:
config_yaml = expander(config_yaml)
yaml_config = yaml.safe_load(config_yaml)
assert isinstance(yaml_config, dict)
conf: Config = Config()
if "inputs" not in yaml_config or not isinstance(yaml_config["inputs"], list):
raise ValueError("`inputs` must be provided as list")
for input_n, input_config in enumerate(yaml_config["inputs"]):
if "id" not in input_config or not isinstance(input_config["id"], str):
raise ValueError(f"`id` must be provided as string for input at position {input_n + 1}")
if "type" not in input_config or not isinstance(input_config["type"], str):
raise ValueError(f'`type` must be provided as string for input {input_config["id"]}')
try:
current_input: Input = Input(input_type=input_config["type"], input_id=input_config["id"])
except ValueError as e:
raise ValueError(f'An error occurred while applying type configuration for input {input_config["id"]}: {e}')
if "tags" in input_config:
current_input.tags = input_config["tags"]
if "multiline" in input_config:
if not isinstance(input_config["multiline"], dict):
raise ValueError(f'`multiline` must be provided as dictionary for input {input_config["id"]}')
multiline_config = input_config["multiline"]
if "type" not in multiline_config or not isinstance(multiline_config["type"], str):
raise ValueError(
f'`type` must be provided as string in multiline configuration for input {input_config["id"]}'
)
multiline_config["multiline_type"] = multiline_config["type"]
del multiline_config["type"]
try:
current_input.add_multiline_processor(**multiline_config)
except ValueError as e:
raise ValueError(
f'An error occurred while applying multiline configuration for input {input_config["id"]}: {e}'
)
if "expand_event_list_from_field" in input_config:
current_input.expand_event_list_from_field = input_config["expand_event_list_from_field"]
if "root_fields_to_add_to_expanded_event" in input_config:
current_input.root_fields_to_add_to_expanded_event = input_config["root_fields_to_add_to_expanded_event"]
if "json_content_type" in input_config:
current_input.json_content_type = input_config["json_content_type"]
include_rules: list[IncludeExcludeRule] = []
if "include" in input_config:
include_rules_from_config = input_config["include"]
if not isinstance(include_rules_from_config, list):
raise ValueError(f'`include` must be provided as list for input {input_config["id"]}')
for include_rule in include_rules_from_config:
include_rules.append(IncludeExcludeRule(pattern=str(include_rule)))
exclude_rules: list[IncludeExcludeRule] = []
if "exclude" in input_config:
exclude_rules_from_config = input_config["exclude"]
if not isinstance(exclude_rules_from_config, list):
raise ValueError(f'`exclude` must be provided as list for input {input_config["id"]}')
for exclude_rule in exclude_rules_from_config:
exclude_rules.append(IncludeExcludeRule(pattern=str(exclude_rule)))
if len(include_rules) > 0 or len(exclude_rules) > 0:
current_input.include_exclude_filter = IncludeExcludeFilter(
include_patterns=include_rules, exclude_patterns=exclude_rules
)
if "outputs" not in input_config or not isinstance(input_config["outputs"], list):
raise ValueError(f'`outputs` must be provided as list for input {input_config["id"]}')
for output_n, output_config in enumerate(input_config["outputs"]):
if "type" not in output_config or not isinstance(output_config["type"], str):
raise ValueError(
f"`type` for output configuration at position {output_n + 1} must "
f'be provided as string for input {input_config["id"]}'
)
if "args" not in output_config or not isinstance(output_config["args"], dict):
raise ValueError(
f"`args` for output configuration at position {output_n + 1} "
f'must be provided as dictionary for input {input_config["id"]}'
)
output_config["args"]["tags"] = current_input.tags
try:
current_input.add_output(output_type=output_config["type"], **output_config["args"])
except ValueError as e:
raise ValueError(
f"An error occurred while applying output configuration "
f'at position {output_n + 1} for input {input_config["id"]}: {e}'
)
conf.add_input(current_input)
return conf