handlers/aws/replay_trigger.py (54 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. from typing import Any, Optional from share import Config, ElasticsearchOutput, Input, LogstashOutput, Output, shared_logger from shippers import CompositeShipper, ProtocolShipper, ShipperFactory from .exceptions import InputConfigException, OutputConfigException, ReplayHandlerException from .utils import delete_sqs_record class ReplayedEventReplayHandler: def __init__(self, replay_queue_arn: str): self._replay_queue_arn = replay_queue_arn self._failed_event_ids: list[str] = [] self._events_with_receipt_handle: dict[str, str] = {} def add_event_with_receipt_handle(self, event_uniq_id: str, receipt_handle: str) -> None: self._events_with_receipt_handle[event_uniq_id] = receipt_handle def replay_handler( self, output_destination: str, output_args: dict[str, Any], event_payload: dict[str, Any] ) -> None: event_uniq_id: str = event_payload["_id"] + output_destination self._failed_event_ids.append(event_uniq_id) def flush(self) -> None: for failed_event_uniq_id in self._failed_event_ids: del self._events_with_receipt_handle[failed_event_uniq_id] for receipt_handle in self._events_with_receipt_handle.values(): delete_sqs_record(self._replay_queue_arn, receipt_handle) if len(self._failed_event_ids) > 0: raise ReplayHandlerException() def get_shipper_for_replay_event( config: Config, output_destination: str, output_args: dict[str, Any], event_input_id: str, replay_handler: ReplayedEventReplayHandler, ) -> Optional[CompositeShipper]: event_input: Optional[Input] = config.get_input_by_id(event_input_id) if event_input is None: raise InputConfigException(f"Cannot load input for input id {event_input_id}") output: Optional[Output] = event_input.get_output_by_destination(output_destination) if output is None: raise OutputConfigException(f"Cannot load output with destination {output_destination}") # Let's wrap the specific output shipper in the composite one, since the composite deepcopy the mutating events shipper: CompositeShipper = CompositeShipper() if output.type == "elasticsearch": assert isinstance(output, ElasticsearchOutput) output.es_datastream_name = output_args["es_datastream_name"] shared_logger.debug("setting ElasticSearch shipper") elasticsearch: ProtocolShipper = ShipperFactory.create_from_output(output_type=output.type, output=output) shipper.add_shipper(elasticsearch) shipper.set_replay_handler(replay_handler=replay_handler.replay_handler) return shipper if output.type == "logstash": assert isinstance(output, LogstashOutput) shared_logger.debug("setting Logstash shipper") logstash: ProtocolShipper = ShipperFactory.create_from_output(output_type=output.type, output=output) shipper.add_shipper(logstash) shipper.set_replay_handler(replay_handler=replay_handler.replay_handler) return shipper return None