in handlers/aws/utils.py [0:0]
def get_shipper_from_input(event_input: Input) -> CompositeShipper:
composite_shipper: CompositeShipper = CompositeShipper()
for output_destination in event_input.get_output_destinations():
output: Optional[Output] = event_input.get_output_by_destination(output_destination)
assert output is not None
if output.type == "elasticsearch":
shared_logger.debug("setting ElasticSearch shipper")
elasticsearch_shipper: ProtocolShipper = ShipperFactory.create_from_output(
output_type="elasticsearch", output=output
)
composite_shipper.add_shipper(shipper=elasticsearch_shipper)
if output.type == "logstash":
shared_logger.debug("setting Logstash shipper")
logstash_shipper: ProtocolShipper = ShipperFactory.create_from_output(output_type="logstash", output=output)
composite_shipper.add_shipper(shipper=logstash_shipper)
replay_handler = ReplayEventHandler(event_input=event_input)
composite_shipper.set_replay_handler(replay_handler=replay_handler.replay_handler)
if event_input.type == "cloudwatch-logs":
composite_shipper.set_event_id_generator(event_id_generator=cloudwatch_logs_object_id)
elif event_input.type == "sqs":
composite_shipper.set_event_id_generator(event_id_generator=sqs_object_id)
elif event_input.type == "s3-sqs":
composite_shipper.set_event_id_generator(event_id_generator=s3_object_id)
elif event_input.type == "kinesis-data-stream":
composite_shipper.set_event_id_generator(event_id_generator=kinesis_record_id)
composite_shipper.add_include_exclude_filter(event_input.include_exclude_filter)
return composite_shipper