export function makeIterableAbortable()

in packages/dubbo/src/protocol/async-iterable.ts [1228:1284]


export function makeIterableAbortable<T>(
  iterable: AsyncIterable<T>
): AsyncIterable<T> & Abortable {
  const innerCandidate = iterable[Symbol.asyncIterator]();
  if (innerCandidate.throw === undefined) {
    throw new Error("AsyncIterable does not implement throw");
  }
  const inner = innerCandidate as Required<AsyncIterator<T>>;
  let aborted: { reason: unknown; state: Promise<AbortState> } | undefined;
  let resultPromise: Promise<IteratorResult<T>> | undefined;
  let it: AsyncIterator<T> = {
    next(): Promise<IteratorResult<T>> {
      resultPromise = inner.next().finally(() => {
        resultPromise = undefined;
      });
      return resultPromise;
    },
    throw(e?: unknown): Promise<IteratorResult<T>> {
      return inner.throw(e);
    },
  };
  if (innerCandidate.return === undefined) {
    it = {
      ...it,
      return(value?: unknown): Promise<IteratorResult<T>> {
        return inner.return(value);
      },
    };
  }
  let used = false;
  return {
    abort(reason: unknown): Promise<AbortState> {
      if (aborted) {
        return aborted.state;
      }
      const f = (): Promise<AbortState> => {
        return inner.throw(reason).then(
          (r) => (r.done === true ? "completed" : "caught"),
          () => "rethrown"
        );
      };
      if (resultPromise) {
        aborted = { reason, state: resultPromise.then(f, f) };
        return aborted.state;
      }
      aborted = { reason, state: f() };
      return aborted.state;
    },
    [Symbol.asyncIterator](): AsyncIterator<T> {
      if (used) {
        throw new Error("AsyncIterable cannot be re-used");
      }
      used = true;
      return it;
    },
  };
}