in shippers/es.py [0:0]
def send(self, event: dict[str, Any]) -> str:
self._replay_args["es_datastream_name"] = self._es_datastream_name
if not hasattr(self, "_es_index") or self._es_index == "":
self._discover_dataset(event_payload=event)
self._enrich_event(event_payload=event)
event["_op_type"] = "create"
if "_index" not in event:
event["_index"] = self._es_index
if "_id" not in event and self._event_id_generator is not None:
event["_id"] = self._event_id_generator(event)
event = normalise_event(event_payload=event)
self._bulk_actions.append(event)
if len(self._bulk_actions) < self._bulk_batch_size:
return _EVENT_BUFFERED
self.flush()
return _EVENT_SENT