in packages/dubbo/src/protocol/async-iterable.ts [410:592]
export function pipe<T1, T2, T3>(
iterable: AsyncIterable<T1>,
transform1: AsyncIterableTransform<T1, T2>,
transform2: AsyncIterableTransform<T2, T3>,
options?: PipeOptions
): AsyncIterable<T3>;
/**
* Apply one or more transformations to an asynchronous iterable.
*
* @private Internal code, does not follow semantic versioning.
*/
export function pipe<T1, T2, T3, T4>(
iterable: AsyncIterable<T1>,
transform1: AsyncIterableTransform<T1, T2>,
transform2: AsyncIterableTransform<T2, T3>,
transform3: AsyncIterableTransform<T3, T4>,
options?: PipeOptions
): AsyncIterable<T4>;
/**
* Apply one or more transformations to an asynchronous iterable.
*
* @private Internal code, does not follow semantic versioning.
*/
export function pipe<T1, T2, T3, T4, T5>(
iterable: AsyncIterable<T1>,
transform1: AsyncIterableTransform<T1, T2>,
transform2: AsyncIterableTransform<T2, T3>,
transform3: AsyncIterableTransform<T3, T4>,
transform4: AsyncIterableTransform<T4, T5>,
options?: PipeOptions
): AsyncIterable<T5>;
/**
* Apply one or more transformations to an asynchronous iterable.
*
* @private Internal code, does not follow semantic versioning.
*/
export function pipe<T1, T2, T3, T4, T5, T6>(
iterable: AsyncIterable<T1>,
transform1: AsyncIterableTransform<T1, T2>,
transform2: AsyncIterableTransform<T2, T3>,
transform3: AsyncIterableTransform<T3, T4>,
transform4: AsyncIterableTransform<T4, T5>,
transform5: AsyncIterableTransform<T5, T6>,
options?: PipeOptions
): AsyncIterable<T6>;
/**
* Apply one or more transformations to an asynchronous iterable.
*
* @private Internal code, does not follow semantic versioning.
*/
export function pipe<T1, T2, T3, T4, T5, T6, T7>(
iterable: AsyncIterable<T1>,
transform1: AsyncIterableTransform<T1, T2>,
transform2: AsyncIterableTransform<T2, T3>,
transform3: AsyncIterableTransform<T3, T4>,
transform4: AsyncIterableTransform<T4, T5>,
transform5: AsyncIterableTransform<T5, T6>,
transform6: AsyncIterableTransform<T6, T7>,
options?: PipeOptions
): AsyncIterable<T7>;
/**
* Apply one or more transformations to an asynchronous iterable.
*
* @private Internal code, does not follow semantic versioning.
*/
export function pipe<T1, T2, T3, T4, T5, T6, T7, T8>(
iterable: AsyncIterable<T1>,
transform1: AsyncIterableTransform<T1, T2>,
transform2: AsyncIterableTransform<T2, T3>,
transform3: AsyncIterableTransform<T3, T4>,
transform4: AsyncIterableTransform<T4, T5>,
transform5: AsyncIterableTransform<T5, T6>,
transform6: AsyncIterableTransform<T6, T7>,
transform7: AsyncIterableTransform<T7, T8>,
options?: PipeOptions
): AsyncIterable<T8>;
/**
* Apply one or more transformations to an asynchronous iterable.
*
* @private Internal code, does not follow semantic versioning.
*/
export function pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9>(
iterable: AsyncIterable<T1>,
transform1: AsyncIterableTransform<T1, T2>,
transform2: AsyncIterableTransform<T2, T3>,
transform3: AsyncIterableTransform<T3, T4>,
transform4: AsyncIterableTransform<T4, T5>,
transform5: AsyncIterableTransform<T5, T6>,
transform6: AsyncIterableTransform<T6, T7>,
transform7: AsyncIterableTransform<T7, T8>,
transform8: AsyncIterableTransform<T8, T9>,
options?: PipeOptions
): AsyncIterable<T9>;
/**
* Apply one or more transformations to an asynchronous iterable.
*
* @private Internal code, does not follow semantic versioning.
*/
export function pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(
iterable: AsyncIterable<T1>,
transform1: AsyncIterableTransform<T1, T2>,
transform2: AsyncIterableTransform<T2, T3>,
transform3: AsyncIterableTransform<T3, T4>,
transform4: AsyncIterableTransform<T4, T5>,
transform5: AsyncIterableTransform<T5, T6>,
transform6: AsyncIterableTransform<T6, T7>,
transform7: AsyncIterableTransform<T7, T8>,
transform8: AsyncIterableTransform<T8, T9>,
transform9: AsyncIterableTransform<T9, T10>,
options?: PipeOptions
): AsyncIterable<T10>;
// prettier-ignore
/**
* Apply one or more transformations to an asynchronous iterable.
*
* @private Internal code, does not follow semantic versioning.
*/
export function pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(
iterable: AsyncIterable<T1>,
transform1: AsyncIterableTransform<T1, T2>,
transform2: AsyncIterableTransform<T2, T3>,
transform3: AsyncIterableTransform<T3, T4>,
transform4: AsyncIterableTransform<T4, T5>,
transform5: AsyncIterableTransform<T5, T6>,
transform6: AsyncIterableTransform<T6, T7>,
transform7: AsyncIterableTransform<T7, T8>,
transform8: AsyncIterableTransform<T8, T9>,
transform9: AsyncIterableTransform<T9, T10>,
transform10: AsyncIterableTransform<T10, T11>,
options?: PipeOptions
): AsyncIterable<T11>;
// prettier-ignore
/**
* Apply one or more transformations to an asynchronous iterable.
*
* @private Internal code, does not follow semantic versioning.
*/
export function pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(
iterable: AsyncIterable<T1>,
transform1: AsyncIterableTransform<T1, T2>,
transform2: AsyncIterableTransform<T2, T3>,
transform3: AsyncIterableTransform<T3, T4>,
transform4: AsyncIterableTransform<T4, T5>,
transform5: AsyncIterableTransform<T5, T6>,
transform6: AsyncIterableTransform<T6, T7>,
transform7: AsyncIterableTransform<T7, T8>,
transform8: AsyncIterableTransform<T8, T9>,
transform9: AsyncIterableTransform<T9, T10>,
transform10: AsyncIterableTransform<T10, T11>,
transform11: AsyncIterableTransform<T11, T12>,
options?: PipeOptions
): AsyncIterable<T12>;
export async function* pipe<I, O>(
source: AsyncIterable<I>,
...rest: (AsyncIterableTransform<unknown> | PipeOptions | undefined)[]
): AsyncIterable<O> {
const [transforms, opt] = pickTransforms(rest);
let abortable: Abortable | undefined;
let iterable: AsyncIterable<unknown> = source;
if (opt?.propagateDownStreamError === true) {
iterable = abortable = makeIterableAbortable(iterable);
}
for (const t of transforms) {
iterable = t(iterable);
}
const it = iterable[Symbol.asyncIterator]();
for (;;) {
const r = await it.next();
if (r.done === true) {
break;
}
if (!abortable) {
yield r.value as O;
continue;
}
try {
yield r.value as O;
} catch (e) {
await abortable.abort(e); // propagate downstream error to the source
throw e;
}
}
}