in streaming.ts [103:149]
static fromReadableStream<Item>(
readableStream: ReadableStream,
controller: AbortController,
) {
let consumed = false;
async function* iterLines(): AsyncGenerator<string, void, unknown> {
const lineDecoder = new LineDecoder();
const iter = readableStreamAsyncIterable<Bytes>(readableStream);
for await (const chunk of iter) {
for (const line of lineDecoder.decode(chunk)) {
yield line;
}
}
for (const line of lineDecoder.flush()) {
yield line;
}
}
async function* iterator(): AsyncIterator<Item, any, undefined> {
if (consumed) {
throw new Error(
"Cannot iterate over a consumed stream, use `.tee()` to split the stream.",
);
}
consumed = true;
let done = false;
try {
for await (const line of iterLines()) {
if (done) continue;
if (line) yield JSON.parse(line);
}
done = true;
} catch (e) {
// If the user calls `stream.controller.abort()`, we should exit without throwing.
if (e instanceof Error && e.name === "AbortError") return;
throw e;
} finally {
// If the user `break`s, abort the ongoing request.
if (!done) controller.abort();
}
}
return new Stream(iterator, controller);
}