in shippers/logstash.py [0:0]
def _send(self) -> None:
ndjson = "\n".join(json_dumper(event) for event in self._events_batch)
try:
response = self._session.put(
self._logstash_url,
data=gzip.compress(ndjson.encode("utf-8"), self._compression_level),
headers={"Content-Encoding": "gzip", "Content-Type": "application/x-ndjson"},
timeout=_TIMEOUT,
)
if response.status_code == 401:
raise RequestException("Authentication error")
except RequestException as e:
shared_logger.error(
f"logstash shipper encountered an error while publishing events to logstash. Error: {str(e)}"
)
if self._replay_handler is not None:
for event in self._events_batch:
# let's put back the _id field from @metadata._id
if "@metadata" in event and "_id" in event["@metadata"]:
event["_id"] = event["@metadata"]["_id"]
del event["@metadata"]
self._replay_handler(self._logstash_url, self._replay_args, event)