in src/anthropic/_streaming.py [0:0]
def __stream__(self) -> Iterator[_T]:
cast_to = cast(Any, self._cast_to)
response = self.response
process_data = self._client._process_response_data
iterator = self._iter_events()
for sse in iterator:
if sse.event == "completion":
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
if (
sse.event == "message_start"
or sse.event == "message_delta"
or sse.event == "message_stop"
or sse.event == "content_block_start"
or sse.event == "content_block_delta"
or sse.event == "content_block_stop"
):
data = sse.json()
if is_dict(data) and "type" not in data:
data["type"] = sse.event
yield process_data(data=data, cast_to=cast_to, response=response)
if sse.event == "ping":
continue
if sse.event == "error":
body = sse.data
try:
body = sse.json()
err_msg = f"{body}"
except Exception:
err_msg = sse.data or f"Error code: {response.status_code}"
raise self._client._make_status_error(
err_msg,
body=body,
response=self.response,
)
# Ensure the entire stream is consumed
for _sse in iterator:
...