shippers/composite.py (44 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
from copy import deepcopy
from typing import Any, Optional
from share import IncludeExcludeFilter, shared_logger
from .shipper import (
EVENT_IS_EMPTY,
EVENT_IS_FILTERED,
EVENT_IS_SENT,
EventIdGeneratorCallable,
ProtocolShipper,
ReplayHandlerCallable,
)
class CompositeShipper:
"""
Composite Shipper.
This class implements composite pattern for shippers
"""
def __init__(self, **kwargs: Any):
self._shippers: list[ProtocolShipper] = []
self._include_exclude_filter: Optional[IncludeExcludeFilter] = None
def add_include_exclude_filter(self, include_exclude_filter: Optional[IncludeExcludeFilter]) -> None:
"""
IncludeExcludeFilter setter.
Add an includeExcludeFilter to the composite
"""
self._include_exclude_filter = include_exclude_filter
def add_shipper(self, shipper: ProtocolShipper) -> None:
"""
Shipper setter.
Add a shipper to the composite
"""
self._shippers.append(shipper)
def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None:
for shipper in self._shippers:
shipper.set_event_id_generator(event_id_generator=event_id_generator)
def set_replay_handler(self, replay_handler: ReplayHandlerCallable) -> None:
for shipper in self._shippers:
shipper.set_replay_handler(replay_handler=replay_handler)
def send(self, event: dict[str, Any]) -> str:
message: str = ""
if "fields" in event and "message" in event["fields"]:
message = event["fields"]["message"]
elif "message" in event:
message = event["message"]
if len(message.strip()) == 0:
shared_logger.debug("event is empty: message is zero length")
return EVENT_IS_EMPTY
if self._include_exclude_filter is not None and not self._include_exclude_filter.filter(message):
shared_logger.debug("event is filtered according to filter rules")
return EVENT_IS_FILTERED
for shipper in self._shippers:
# dict are mutated if not deep copied, every shipper can mutate the
# events it receives without affecting the events of other shippers
sent_event = deepcopy(event)
shipper.send(sent_event)
return EVENT_IS_SENT
def flush(self) -> None:
for shipper in self._shippers:
shipper.flush()