in packages/dubbo/src/protocol/async-iterable.ts [1228:1284]
export function makeIterableAbortable<T>(
iterable: AsyncIterable<T>
): AsyncIterable<T> & Abortable {
const innerCandidate = iterable[Symbol.asyncIterator]();
if (innerCandidate.throw === undefined) {
throw new Error("AsyncIterable does not implement throw");
}
const inner = innerCandidate as Required<AsyncIterator<T>>;
let aborted: { reason: unknown; state: Promise<AbortState> } | undefined;
let resultPromise: Promise<IteratorResult<T>> | undefined;
let it: AsyncIterator<T> = {
next(): Promise<IteratorResult<T>> {
resultPromise = inner.next().finally(() => {
resultPromise = undefined;
});
return resultPromise;
},
throw(e?: unknown): Promise<IteratorResult<T>> {
return inner.throw(e);
},
};
if (innerCandidate.return === undefined) {
it = {
...it,
return(value?: unknown): Promise<IteratorResult<T>> {
return inner.return(value);
},
};
}
let used = false;
return {
abort(reason: unknown): Promise<AbortState> {
if (aborted) {
return aborted.state;
}
const f = (): Promise<AbortState> => {
return inner.throw(reason).then(
(r) => (r.done === true ? "completed" : "caught"),
() => "rethrown"
);
};
if (resultPromise) {
aborted = { reason, state: resultPromise.then(f, f) };
return aborted.state;
}
aborted = { reason, state: f() };
return aborted.state;
},
[Symbol.asyncIterator](): AsyncIterator<T> {
if (used) {
throw new Error("AsyncIterable cannot be re-used");
}
used = true;
return it;
},
};
}