in src/openai/lib/streaming/chat/_completions.py [0:0]
def _accumulate_chunk(self, chunk: ChatCompletionChunk) -> ParsedChatCompletionSnapshot:
completion_snapshot = self.__current_completion_snapshot
if completion_snapshot is None:
return _convert_initial_chunk_into_snapshot(chunk)
for choice in chunk.choices:
try:
choice_snapshot = completion_snapshot.choices[choice.index]
previous_tool_calls = choice_snapshot.message.tool_calls or []
choice_snapshot.message = cast(
ParsedChatCompletionMessageSnapshot,
construct_type(
type_=ParsedChatCompletionMessageSnapshot,
value=accumulate_delta(
cast(
"dict[object, object]",
model_dump(
choice_snapshot.message,
# we don't want to serialise / deserialise our custom properties
# as they won't appear in the delta and we don't want to have to
# continuosly reparse the content
exclude=cast(
# cast required as mypy isn't smart enough to infer `True` here to `Literal[True]`
IncEx,
{
"parsed": True,
"tool_calls": {
idx: {"function": {"parsed_arguments": True}}
for idx, _ in enumerate(choice_snapshot.message.tool_calls or [])
},
},
),
),
),
cast("dict[object, object]", choice.delta.to_dict()),
),
),
)
# ensure tools that have already been parsed are added back into the newly
# constructed message snapshot
for tool_index, prev_tool in enumerate(previous_tool_calls):
new_tool = (choice_snapshot.message.tool_calls or [])[tool_index]
if prev_tool.type == "function":
assert new_tool.type == "function"
new_tool.function.parsed_arguments = prev_tool.function.parsed_arguments
elif TYPE_CHECKING: # type: ignore[unreachable]
assert_never(prev_tool)
except IndexError:
choice_snapshot = cast(
ParsedChoiceSnapshot,
construct_type(
type_=ParsedChoiceSnapshot,
value={
**choice.model_dump(exclude_unset=True, exclude={"delta"}),
"message": choice.delta.to_dict(),
},
),
)
completion_snapshot.choices.append(choice_snapshot)
if choice.finish_reason:
choice_snapshot.finish_reason = choice.finish_reason
if has_parseable_input(response_format=self._response_format, input_tools=self._input_tools):
if choice.finish_reason == "length":
# at the time of writing, `.usage` will always be `None` but
# we include it here in case that is changed in the future
raise LengthFinishReasonError(completion=completion_snapshot)
if choice.finish_reason == "content_filter":
raise ContentFilterFinishReasonError()
if (
choice_snapshot.message.content
and not choice_snapshot.message.refusal
and is_given(self._rich_response_format)
# partial parsing fails on white-space
and choice_snapshot.message.content.lstrip()
):
choice_snapshot.message.parsed = from_json(
bytes(choice_snapshot.message.content, "utf-8"),
partial_mode=True,
)
for tool_call_chunk in choice.delta.tool_calls or []:
tool_call_snapshot = (choice_snapshot.message.tool_calls or [])[tool_call_chunk.index]
if tool_call_snapshot.type == "function":
input_tool = get_input_tool_by_name(
input_tools=self._input_tools, name=tool_call_snapshot.function.name
)
if (
input_tool
and input_tool.get("function", {}).get("strict")
and tool_call_snapshot.function.arguments
):
tool_call_snapshot.function.parsed_arguments = from_json(
bytes(tool_call_snapshot.function.arguments, "utf-8"),
partial_mode=True,
)
elif TYPE_CHECKING: # type: ignore[unreachable]
assert_never(tool_call_snapshot)
if choice.logprobs is not None:
if choice_snapshot.logprobs is None:
choice_snapshot.logprobs = build(
ChoiceLogprobs,
content=choice.logprobs.content,
refusal=choice.logprobs.refusal,
)
else:
if choice.logprobs.content:
if choice_snapshot.logprobs.content is None:
choice_snapshot.logprobs.content = []
choice_snapshot.logprobs.content.extend(choice.logprobs.content)
if choice.logprobs.refusal:
if choice_snapshot.logprobs.refusal is None:
choice_snapshot.logprobs.refusal = []
choice_snapshot.logprobs.refusal.extend(choice.logprobs.refusal)
completion_snapshot.usage = chunk.usage
completion_snapshot.system_fingerprint = chunk.system_fingerprint
return completion_snapshot