def _send()

in shippers/logstash.py [0:0]


    def _send(self) -> None:
        ndjson = "\n".join(json_dumper(event) for event in self._events_batch)

        try:
            response = self._session.put(
                self._logstash_url,
                data=gzip.compress(ndjson.encode("utf-8"), self._compression_level),
                headers={"Content-Encoding": "gzip", "Content-Type": "application/x-ndjson"},
                timeout=_TIMEOUT,
            )

            if response.status_code == 401:
                raise RequestException("Authentication error")
        except RequestException as e:
            shared_logger.error(
                f"logstash shipper encountered an error while publishing events to logstash. Error: {str(e)}"
            )

            if self._replay_handler is not None:
                for event in self._events_batch:
                    # let's put back the _id field from @metadata._id
                    if "@metadata" in event and "_id" in event["@metadata"]:
                        event["_id"] = event["@metadata"]["_id"]
                        del event["@metadata"]

                    self._replay_handler(self._logstash_url, self._replay_args, event)