in shippers/es.py [0:0]
def flush(self) -> None:
if len(self._bulk_actions) == 0:
return
errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs)
failed = self._handle_outcome(actions=self._bulk_actions, errors=errors)
# Send failed requests to dead letter index, if enabled
if len(failed) > 0 and self._es_dead_letter_index:
failed = self._send_dead_letter_index(failed)
# Send remaining failed requests to replay queue, if enabled
if isinstance(failed, list) and len(failed) > 0 and self._replay_handler is not None:
for outcome in failed:
if "action" not in outcome:
shared_logger.error("action could not be extracted to be replayed", extra={"outcome": outcome})
continue
self._replay_handler(self._output_destination, self._replay_args, outcome["action"])
self._bulk_actions = []
return