export function pipeTo()

in packages/dubbo/src/protocol/async-iterable.ts [210:339]


export function pipeTo<T1, T2, T3, T4, T5, T6, T7>(
  iterable: AsyncIterable<T1>,
  transform1: AsyncIterableTransform<T1, T2>,
  transform2: AsyncIterableTransform<T2, T3>,
  transform3: AsyncIterableTransform<T3, T4>,
  transform4: AsyncIterableTransform<T4, T5>,
  transform5: AsyncIterableTransform<T5, T6>,
  sink: AsyncIterableSink<T6, T7>,
  options?: PipeOptions
): Promise<T7>;
/**
 * Takes an asynchronous iterable as a source, applies transformations, and
 * passes it to a sink.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipeTo<T1, T2, T3, T4, T5, T6, T7, T8>(
  iterable: AsyncIterable<T1>,
  transform1: AsyncIterableTransform<T1, T2>,
  transform2: AsyncIterableTransform<T2, T3>,
  transform3: AsyncIterableTransform<T3, T4>,
  transform4: AsyncIterableTransform<T4, T5>,
  transform5: AsyncIterableTransform<T5, T6>,
  transform6: AsyncIterableTransform<T6, T7>,
  sink: AsyncIterableSink<T7, T8>,
  options?: PipeOptions
): Promise<T8>;
/**
 * Takes an asynchronous iterable as a source, applies transformations, and
 * passes it to a sink.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipeTo<T1, T2, T3, T4, T5, T6, T7, T8, T9>(
  iterable: AsyncIterable<T1>,
  transform1: AsyncIterableTransform<T1, T2>,
  transform2: AsyncIterableTransform<T2, T3>,
  transform3: AsyncIterableTransform<T3, T4>,
  transform4: AsyncIterableTransform<T4, T5>,
  transform5: AsyncIterableTransform<T5, T6>,
  transform6: AsyncIterableTransform<T6, T7>,
  transform7: AsyncIterableTransform<T7, T8>,
  sink: AsyncIterableSink<T8, T9>,
  options?: PipeOptions
): Promise<T9>;
/**
 * Takes an asynchronous iterable as a source, applies transformations, and
 * passes it to a sink.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipeTo<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(
  iterable: AsyncIterable<T1>,
  transform1: AsyncIterableTransform<T1, T2>,
  transform2: AsyncIterableTransform<T2, T3>,
  transform3: AsyncIterableTransform<T3, T4>,
  transform4: AsyncIterableTransform<T4, T5>,
  transform5: AsyncIterableTransform<T5, T6>,
  transform6: AsyncIterableTransform<T6, T7>,
  transform7: AsyncIterableTransform<T7, T8>,
  transform8: AsyncIterableTransform<T8, T9>,
  sink: AsyncIterableSink<T9, T10>,
  options?: PipeOptions
): Promise<T10>;
// prettier-ignore
/**
 * Takes an asynchronous iterable as a source, applies transformations, and
 * passes it to a sink.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipeTo<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(
    iterable: AsyncIterable<T1>,
    transform1: AsyncIterableTransform<T1, T2>,
    transform2: AsyncIterableTransform<T2, T3>,
    transform3: AsyncIterableTransform<T3, T4>,
    transform4: AsyncIterableTransform<T4, T5>,
    transform5: AsyncIterableTransform<T5, T6>,
    transform6: AsyncIterableTransform<T6, T7>,
    transform7: AsyncIterableTransform<T7, T8>,
    transform8: AsyncIterableTransform<T8, T9>,
    transform9: AsyncIterableTransform<T9, T10>,
    sink: AsyncIterableSink<T10, T11>,
    options?: PipeOptions
): Promise<T11>;
// prettier-ignore
/**
 * Takes an asynchronous iterable as a source, applies transformations, and
 * passes it to a sink.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function pipeTo<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(
    iterable: AsyncIterable<T1>,
    transform1: AsyncIterableTransform<T1, T2>,
    transform2: AsyncIterableTransform<T2, T3>,
    transform3: AsyncIterableTransform<T3, T4>,
    transform4: AsyncIterableTransform<T4, T5>,
    transform5: AsyncIterableTransform<T5, T6>,
    transform6: AsyncIterableTransform<T6, T7>,
    transform7: AsyncIterableTransform<T7, T8>,
    transform8: AsyncIterableTransform<T8, T9>,
    transform9: AsyncIterableTransform<T9, T10>,
    transform10: AsyncIterableTransform<T10, T11>,
    sink: AsyncIterableSink<T11, T12>,
    options?: PipeOptions
): Promise<T12>;
export function pipeTo(
  source: AsyncIterable<unknown>,
  ...rest: unknown[]
): Promise<unknown> {
  const [transforms, sink, opt] = pickTransformsAndSink(rest);

  let iterable = source;
  let abortable: Abortable | undefined;
  if (opt?.propagateDownStreamError === true) {
    iterable = abortable = makeIterableAbortable(iterable);
  }

  // eslint-disable-next-line @typescript-eslint/ban-ts-comment
  // @ts-ignore
  iterable = pipe(iterable, ...transforms, { propagateDownStreamError: false });

  return sink(iterable).catch((reason) => {
    if (abortable) {
      return abortable.abort(reason).then(() => Promise.reject(reason));
    }
    return Promise.reject(reason);
  });
}