def send()

in shippers/es.py [0:0]


    def send(self, event: dict[str, Any]) -> str:
        self._replay_args["es_datastream_name"] = self._es_datastream_name

        if not hasattr(self, "_es_index") or self._es_index == "":
            self._discover_dataset(event_payload=event)

        self._enrich_event(event_payload=event)

        event["_op_type"] = "create"

        if "_index" not in event:
            event["_index"] = self._es_index

        if "_id" not in event and self._event_id_generator is not None:
            event["_id"] = self._event_id_generator(event)

        event = normalise_event(event_payload=event)

        self._bulk_actions.append(event)

        if len(self._bulk_actions) < self._bulk_batch_size:
            return _EVENT_BUFFERED

        self.flush()

        return _EVENT_SENT