in shippers/es.py [0:0]
def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Union[int, list[Any]]]) -> list[Any]:
assert isinstance(errors[1], list)
success = errors[0]
failed: list[Any] = []
for error in errors[1]:
action_failed = [action for action in actions if action["_id"] == error["create"]["_id"]]
# an ingestion pipeline might override the _id, we can only skip in this case
if len(action_failed) != 1:
continue
shared_logger.warning(
"elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]}
)
if "status" in error["create"] and error["create"]["status"] == http.HTTPStatus.CONFLICT:
# Skip duplicate events on dead letter index and replay queue
continue
failed_error = {"action": action_failed[0]} | self._parse_error(error["create"])
failed.append(failed_error)
if len(failed) > 0:
shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": len(failed)})
else:
shared_logger.info("elasticsearch shipper", extra={"success": success, "failed": len(failed)})
return failed