in src/openai/lib/streaming/responses/_responses.py [0:0]
def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEvent[TextFormatT]]:
self.__current_snapshot = snapshot = self.accumulate_event(event)
events: List[ResponseStreamEvent[TextFormatT]] = []
if event.type == "response.output_text.delta":
output = snapshot.output[event.output_index]
assert output.type == "message"
content = output.content[event.content_index]
assert content.type == "output_text"
events.append(
build(
ResponseTextDeltaEvent,
content_index=event.content_index,
delta=event.delta,
item_id=event.item_id,
output_index=event.output_index,
type="response.output_text.delta",
snapshot=content.text,
)
)
elif event.type == "response.output_text.done":
output = snapshot.output[event.output_index]
assert output.type == "message"
content = output.content[event.content_index]
assert content.type == "output_text"
events.append(
build(
ResponseTextDoneEvent[TextFormatT],
content_index=event.content_index,
item_id=event.item_id,
output_index=event.output_index,
type="response.output_text.done",
text=event.text,
parsed=parse_text(event.text, text_format=self._text_format),
)
)
elif event.type == "response.function_call_arguments.delta":
output = snapshot.output[event.output_index]
assert output.type == "function_call"
events.append(
build(
ResponseFunctionCallArgumentsDeltaEvent,
delta=event.delta,
item_id=event.item_id,
output_index=event.output_index,
type="response.function_call_arguments.delta",
snapshot=output.arguments,
)
)
elif event.type == "response.completed":
response = self._completed_response
assert response is not None
events.append(
build(
ResponseCompletedEvent,
type="response.completed",
response=response,
)
)
else:
events.append(event)
return events