export function pipe()

in packages/dubbo/src/protocol/async-iterable.ts [421:592]


export function pipe<T1, T2, T3, T4>(
  iterable: AsyncIterable<T1>,
  transform1: AsyncIterableTransform<T1, T2>,
  transform2: AsyncIterableTransform<T2, T3>,
  transform3: AsyncIterableTransform<T3, T4>,
  options?: PipeOptions
): AsyncIterable<T4>;
/**
 * Apply one or more transformations to an asynchronous iterable.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipe<T1, T2, T3, T4, T5>(
  iterable: AsyncIterable<T1>,
  transform1: AsyncIterableTransform<T1, T2>,
  transform2: AsyncIterableTransform<T2, T3>,
  transform3: AsyncIterableTransform<T3, T4>,
  transform4: AsyncIterableTransform<T4, T5>,
  options?: PipeOptions
): AsyncIterable<T5>;
/**
 * Apply one or more transformations to an asynchronous iterable.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipe<T1, T2, T3, T4, T5, T6>(
  iterable: AsyncIterable<T1>,
  transform1: AsyncIterableTransform<T1, T2>,
  transform2: AsyncIterableTransform<T2, T3>,
  transform3: AsyncIterableTransform<T3, T4>,
  transform4: AsyncIterableTransform<T4, T5>,
  transform5: AsyncIterableTransform<T5, T6>,
  options?: PipeOptions
): AsyncIterable<T6>;
/**
 * Apply one or more transformations to an asynchronous iterable.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipe<T1, T2, T3, T4, T5, T6, T7>(
  iterable: AsyncIterable<T1>,
  transform1: AsyncIterableTransform<T1, T2>,
  transform2: AsyncIterableTransform<T2, T3>,
  transform3: AsyncIterableTransform<T3, T4>,
  transform4: AsyncIterableTransform<T4, T5>,
  transform5: AsyncIterableTransform<T5, T6>,
  transform6: AsyncIterableTransform<T6, T7>,
  options?: PipeOptions
): AsyncIterable<T7>;
/**
 * Apply one or more transformations to an asynchronous iterable.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipe<T1, T2, T3, T4, T5, T6, T7, T8>(
  iterable: AsyncIterable<T1>,
  transform1: AsyncIterableTransform<T1, T2>,
  transform2: AsyncIterableTransform<T2, T3>,
  transform3: AsyncIterableTransform<T3, T4>,
  transform4: AsyncIterableTransform<T4, T5>,
  transform5: AsyncIterableTransform<T5, T6>,
  transform6: AsyncIterableTransform<T6, T7>,
  transform7: AsyncIterableTransform<T7, T8>,
  options?: PipeOptions
): AsyncIterable<T8>;
/**
 * Apply one or more transformations to an asynchronous iterable.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9>(
  iterable: AsyncIterable<T1>,
  transform1: AsyncIterableTransform<T1, T2>,
  transform2: AsyncIterableTransform<T2, T3>,
  transform3: AsyncIterableTransform<T3, T4>,
  transform4: AsyncIterableTransform<T4, T5>,
  transform5: AsyncIterableTransform<T5, T6>,
  transform6: AsyncIterableTransform<T6, T7>,
  transform7: AsyncIterableTransform<T7, T8>,
  transform8: AsyncIterableTransform<T8, T9>,
  options?: PipeOptions
): AsyncIterable<T9>;
/**
 * Apply one or more transformations to an asynchronous iterable.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(
  iterable: AsyncIterable<T1>,
  transform1: AsyncIterableTransform<T1, T2>,
  transform2: AsyncIterableTransform<T2, T3>,
  transform3: AsyncIterableTransform<T3, T4>,
  transform4: AsyncIterableTransform<T4, T5>,
  transform5: AsyncIterableTransform<T5, T6>,
  transform6: AsyncIterableTransform<T6, T7>,
  transform7: AsyncIterableTransform<T7, T8>,
  transform8: AsyncIterableTransform<T8, T9>,
  transform9: AsyncIterableTransform<T9, T10>,
  options?: PipeOptions
): AsyncIterable<T10>;
// prettier-ignore
/**
 * Apply one or more transformations to an asynchronous iterable.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(
    iterable: AsyncIterable<T1>,
    transform1: AsyncIterableTransform<T1, T2>,
    transform2: AsyncIterableTransform<T2, T3>,
    transform3: AsyncIterableTransform<T3, T4>,
    transform4: AsyncIterableTransform<T4, T5>,
    transform5: AsyncIterableTransform<T5, T6>,
    transform6: AsyncIterableTransform<T6, T7>,
    transform7: AsyncIterableTransform<T7, T8>,
    transform8: AsyncIterableTransform<T8, T9>,
    transform9: AsyncIterableTransform<T9, T10>,
    transform10: AsyncIterableTransform<T10, T11>,
    options?: PipeOptions
): AsyncIterable<T11>;
// prettier-ignore
/**
 * Apply one or more transformations to an asynchronous iterable.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(
    iterable: AsyncIterable<T1>,
    transform1: AsyncIterableTransform<T1, T2>,
    transform2: AsyncIterableTransform<T2, T3>,
    transform3: AsyncIterableTransform<T3, T4>,
    transform4: AsyncIterableTransform<T4, T5>,
    transform5: AsyncIterableTransform<T5, T6>,
    transform6: AsyncIterableTransform<T6, T7>,
    transform7: AsyncIterableTransform<T7, T8>,
    transform8: AsyncIterableTransform<T8, T9>,
    transform9: AsyncIterableTransform<T9, T10>,
    transform10: AsyncIterableTransform<T10, T11>,
    transform11: AsyncIterableTransform<T11, T12>,
    options?: PipeOptions
): AsyncIterable<T12>;
export async function* pipe<I, O>(
  source: AsyncIterable<I>,
  ...rest: (AsyncIterableTransform<unknown> | PipeOptions | undefined)[]
): AsyncIterable<O> {
  const [transforms, opt] = pickTransforms(rest);
  let abortable: Abortable | undefined;
  let iterable: AsyncIterable<unknown> = source;
  if (opt?.propagateDownStreamError === true) {
    iterable = abortable = makeIterableAbortable(iterable);
  }
  for (const t of transforms) {
    iterable = t(iterable);
  }
  const it = iterable[Symbol.asyncIterator]();
  for (;;) {
    const r = await it.next();
    if (r.done === true) {
      break;
    }
    if (!abortable) {
      yield r.value as O;
      continue;
    }
    try {
      yield r.value as O;
    } catch (e) {
      await abortable.abort(e); // propagate downstream error to the source
      throw e;
    }
  }
}