shippers/logstash.py (102 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. import gzip from typing import Any, Optional from requests import Session from requests.adapters import HTTPAdapter from requests.exceptions import RequestException from urllib3.util.retry import Retry from share import json_dumper, normalise_event, shared_logger from shippers.shipper import EventIdGeneratorCallable, ReplayHandlerCallable _EVENT_SENT = "_EVENT_SENT" _EVENT_BUFFERED = "_EVENT_BUFFERED" _TIMEOUT = 10 _MAX_RETRIES = 4 _STATUS_FORCE_LIST = [429, 500, 502, 503, 504] # A backoff factor to apply between attempts after the second try. urllib3 will sleep for: # {backoff factor} * (2 ** ({number of total retries} - 1)) # seconds. If the backoff_factor is 1, then sleep() will sleep for [0s, 2s, 4s, …] between retries. _BACKOFF_FACTOR = 1 class LogstashAdapter(HTTPAdapter): """ An HTTP adapter specific for Logstash that encapsulates the retry/backoff parameters and allows to verify certificates by SSL fingerprint """ def __init__(self, fingerprint: str, *args, **kwargs): # type: ignore self._fingerprint = fingerprint retry_strategy = Retry(total=_MAX_RETRIES, backoff_factor=_BACKOFF_FACTOR, status_forcelist=_STATUS_FORCE_LIST) HTTPAdapter.__init__(self, max_retries=retry_strategy, *args, **kwargs) # type: ignore def init_poolmanager(self, *args, **kwargs): # type: ignore if self._fingerprint: kwargs["assert_fingerprint"] = self._fingerprint return super().init_poolmanager(*args, **kwargs) # type: ignore class LogstashShipper: """ Logstash Shipper. This class implements concrete Logstash Shipper """ def __init__( self, logstash_url: str = "", username: str = "", password: str = "", max_batch_size: int = 1, compression_level: int = 9, ssl_assert_fingerprint: str = "", tags: list[str] = [], ) -> None: if logstash_url: self._logstash_url = logstash_url else: raise ValueError("You must provide logstash_url") self._replay_handler: Optional[ReplayHandlerCallable] = None self._event_id_generator: Optional[EventIdGeneratorCallable] = None self._events_batch: list[dict[str, Any]] = [] self._max_batch_size = max_batch_size self._tags = tags if 0 <= compression_level <= 9: self._compression_level = compression_level else: raise ValueError("compression_level must be an integer value between 0 and 9") self._replay_args: dict[str, Any] = {} self._session = self._get_session(self._logstash_url, username, password, ssl_assert_fingerprint) @staticmethod def _get_session(url: str, username: str, password: str, ssl_assert_fingerprint: str) -> Session: session = Session() if username: session.auth = (username, password) if ssl_assert_fingerprint: session.verify = False session.mount(url, LogstashAdapter(ssl_assert_fingerprint)) return session def send(self, event: dict[str, Any]) -> str: if "_id" not in event and self._event_id_generator is not None: event["_id"] = self._event_id_generator(event) event["tags"] = ["forwarded"] event["tags"] += self._tags event = normalise_event(event) # Let's move _id to @metadata._id for logstash if "_id" in event: event["@metadata"] = {"_id": event["_id"]} del event["_id"] self._events_batch.append(event) if len(self._events_batch) < self._max_batch_size: return _EVENT_BUFFERED self._send() return _EVENT_SENT def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None: self._event_id_generator = event_id_generator def set_replay_handler(self, replay_handler: ReplayHandlerCallable) -> None: self._replay_handler = replay_handler def flush(self) -> None: if len(self._events_batch) > 0: self._send() self._events_batch = [] return def _send(self) -> None: ndjson = "\n".join(json_dumper(event) for event in self._events_batch) try: response = self._session.put( self._logstash_url, data=gzip.compress(ndjson.encode("utf-8"), self._compression_level), headers={"Content-Encoding": "gzip", "Content-Type": "application/x-ndjson"}, timeout=_TIMEOUT, ) if response.status_code == 401: raise RequestException("Authentication error") except RequestException as e: shared_logger.error( f"logstash shipper encountered an error while publishing events to logstash. Error: {str(e)}" ) if self._replay_handler is not None: for event in self._events_batch: # let's put back the _id field from @metadata._id if "@metadata" in event and "_id" in event["@metadata"]: event["_id"] = event["@metadata"]["_id"] del event["@metadata"] self._replay_handler(self._logstash_url, self._replay_args, event)