in streaming.ts [25:97]
static fromSSEResponse<Item>(
response: Response,
controller: AbortController,
) {
let consumed = false;
async function* iterator(): AsyncIterator<Item, any, undefined> {
if (consumed) {
throw new Error(
"Cannot iterate over a consumed stream, use `.tee()` to split the stream.",
);
}
consumed = true;
let done = false;
try {
for await (const sse of _iterSSEMessages(response, controller)) {
if (done) continue;
if (sse.data.startsWith("[DONE]")) {
done = true;
continue;
}
if (sse.event === null) {
let data;
try {
data = JSON.parse(sse.data);
} catch (e) {
console.error(`Could not parse message into JSON:`, sse.data);
console.error(`From chunk:`, sse.raw);
throw e;
}
if (data && data.error) {
throw new APIError(undefined, data.error, undefined, undefined);
}
yield data;
} else {
let data;
try {
data = JSON.parse(sse.data);
} catch (e) {
console.error(`Could not parse message into JSON:`, sse.data);
console.error(`From chunk:`, sse.raw);
throw e;
}
// TODO: Is this where the error should be thrown?
if (sse.event == "error") {
throw new APIError(
undefined,
data.error,
data.message,
undefined,
);
}
yield { event: sse.event, data: data } as any;
}
}
done = true;
} catch (e) {
// If the user calls `stream.controller.abort()`, we should exit without throwing.
if (e instanceof Error && e.name === "AbortError") return;
throw e;
} finally {
// If the user `break`s, abort the ongoing request.
if (!done) controller.abort();
}
}
return new Stream(iterator, controller);
}