def _emit_sse_event()

in src/openai/lib/streaming/_assistants.py [0:0]


    def _emit_sse_event(self, event: AssistantStreamEvent) -> None:
        self._current_event = event
        self.on_event(event)

        self.__current_message_snapshot, new_content = accumulate_event(
            event=event,
            current_message_snapshot=self.__current_message_snapshot,
        )
        if self.__current_message_snapshot is not None:
            self.__message_snapshots[self.__current_message_snapshot.id] = self.__current_message_snapshot

        accumulate_run_step(
            event=event,
            run_step_snapshots=self.__run_step_snapshots,
        )

        for content_delta in new_content:
            assert self.__current_message_snapshot is not None

            block = self.__current_message_snapshot.content[content_delta.index]
            if block.type == "text":
                self.on_text_created(block.text)

        if (
            event.event == "thread.run.completed"
            or event.event == "thread.run.cancelled"
            or event.event == "thread.run.expired"
            or event.event == "thread.run.failed"
            or event.event == "thread.run.requires_action"
            or event.event == "thread.run.incomplete"
        ):
            self.__current_run = event.data
            if self._current_tool_call:
                self.on_tool_call_done(self._current_tool_call)
        elif (
            event.event == "thread.run.created"
            or event.event == "thread.run.in_progress"
            or event.event == "thread.run.cancelling"
            or event.event == "thread.run.queued"
        ):
            self.__current_run = event.data
        elif event.event == "thread.message.created":
            self.on_message_created(event.data)
        elif event.event == "thread.message.delta":
            snapshot = self.__current_message_snapshot
            assert snapshot is not None

            message_delta = event.data.delta
            if message_delta.content is not None:
                for content_delta in message_delta.content:
                    if content_delta.type == "text" and content_delta.text:
                        snapshot_content = snapshot.content[content_delta.index]
                        assert snapshot_content.type == "text"
                        self.on_text_delta(content_delta.text, snapshot_content.text)

                    # If the delta is for a new message content:
                    # - emit on_text_done/on_image_file_done for the previous message content
                    # - emit on_text_created/on_image_created for the new message content
                    if content_delta.index != self._current_message_content_index:
                        if self._current_message_content is not None:
                            if self._current_message_content.type == "text":
                                self.on_text_done(self._current_message_content.text)
                            elif self._current_message_content.type == "image_file":
                                self.on_image_file_done(self._current_message_content.image_file)

                        self._current_message_content_index = content_delta.index
                        self._current_message_content = snapshot.content[content_delta.index]

                    # Update the current_message_content (delta event is correctly emitted already)
                    self._current_message_content = snapshot.content[content_delta.index]

            self.on_message_delta(event.data.delta, snapshot)
        elif event.event == "thread.message.completed" or event.event == "thread.message.incomplete":
            self.__current_message_snapshot = event.data
            self.__message_snapshots[event.data.id] = event.data

            if self._current_message_content_index is not None:
                content = event.data.content[self._current_message_content_index]
                if content.type == "text":
                    self.on_text_done(content.text)
                elif content.type == "image_file":
                    self.on_image_file_done(content.image_file)

            self.on_message_done(event.data)
        elif event.event == "thread.run.step.created":
            self.__current_run_step_id = event.data.id
            self.on_run_step_created(event.data)
        elif event.event == "thread.run.step.in_progress":
            self.__current_run_step_id = event.data.id
        elif event.event == "thread.run.step.delta":
            step_snapshot = self.__run_step_snapshots[event.data.id]

            run_step_delta = event.data.delta
            if (
                run_step_delta.step_details
                and run_step_delta.step_details.type == "tool_calls"
                and run_step_delta.step_details.tool_calls is not None
            ):
                assert step_snapshot.step_details.type == "tool_calls"
                for tool_call_delta in run_step_delta.step_details.tool_calls:
                    if tool_call_delta.index == self._current_tool_call_index:
                        self.on_tool_call_delta(
                            tool_call_delta,
                            step_snapshot.step_details.tool_calls[tool_call_delta.index],
                        )

                    # If the delta is for a new tool call:
                    # - emit on_tool_call_done for the previous tool_call
                    # - emit on_tool_call_created for the new tool_call
                    if tool_call_delta.index != self._current_tool_call_index:
                        if self._current_tool_call is not None:
                            self.on_tool_call_done(self._current_tool_call)

                        self._current_tool_call_index = tool_call_delta.index
                        self._current_tool_call = step_snapshot.step_details.tool_calls[tool_call_delta.index]
                        self.on_tool_call_created(self._current_tool_call)

                    # Update the current_tool_call (delta event is correctly emitted already)
                    self._current_tool_call = step_snapshot.step_details.tool_calls[tool_call_delta.index]

            self.on_run_step_delta(
                event.data.delta,
                step_snapshot,
            )
        elif (
            event.event == "thread.run.step.completed"
            or event.event == "thread.run.step.cancelled"
            or event.event == "thread.run.step.expired"
            or event.event == "thread.run.step.failed"
        ):
            if self._current_tool_call:
                self.on_tool_call_done(self._current_tool_call)

            self.on_run_step_done(event.data)
            self.__current_run_step_id = None
        elif event.event == "thread.created" or event.event == "thread.message.in_progress" or event.event == "error":
            # currently no special handling
            ...
        else:
            # we only want to error at build-time
            if TYPE_CHECKING:  # type: ignore[unreachable]
                assert_never(event)

        self._current_event = None