in src/js/node/stream.ts [5059:5222]
function pipelineImpl(streams, callback, opts) {
if (streams.length === 1 && $isJSArray(streams[0])) {
streams = streams[0];
}
if (streams.length < 2) {
throw new ERR_MISSING_ARGS("streams");
}
const ac = new AbortController();
const signal = ac.signal;
const outerSignal = opts === null || opts === void 0 ? void 0 : opts.signal;
const lastStreamCleanup = [];
validateAbortSignal(outerSignal, "options.signal");
function abort() {
finishImpl(new AbortError());
}
outerSignal === null || outerSignal === void 0 ? void 0 : outerSignal.addEventListener("abort", abort);
let error;
let value;
const destroys = [];
let finishCount = 0;
function finish(err) {
finishImpl(err, --finishCount === 0);
}
function finishImpl(err, final) {
if (err && (!error || error.code === "ERR_STREAM_PREMATURE_CLOSE")) {
error = err;
}
if (!error && !final) {
return;
}
while (destroys.length) {
destroys.shift()(error);
}
outerSignal === null || outerSignal === void 0 ? void 0 : outerSignal.removeEventListener("abort", abort);
ac.abort();
if (final) {
if (!error) {
lastStreamCleanup.forEach(fn => fn());
}
ProcessNextTick(callback, error, value);
}
}
let ret;
for (let i = 0; i < streams.length; i++) {
const stream = streams[i];
const reading = i < streams.length - 1;
const writing = i > 0;
const end = reading || (opts === null || opts === void 0 ? void 0 : opts.end) !== false;
const isLastStream = i === streams.length - 1;
if (isNodeStream(stream)) {
let onError = function (err) {
if (err && err.name !== "AbortError" && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
finish(err);
}
};
if (end) {
const { destroy, cleanup } = destroyer(stream, reading, writing);
destroys.push(destroy);
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
}
stream.on("error", onError);
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(() => {
stream.removeListener("error", onError);
});
}
}
if (i === 0) {
if (typeof stream === "function") {
ret = stream({
signal,
});
if (!isIterable(ret)) {
throw new ERR_INVALID_RETURN_VALUE("Iterable, AsyncIterable or Stream", "source", ret);
}
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
ret = stream;
} else {
ret = Duplex.from(stream);
}
} else if (typeof stream === "function") {
ret = makeAsyncIterable(ret);
ret = stream(ret, {
signal,
});
if (reading) {
if (!isIterable(ret, true)) {
throw new ERR_INVALID_RETURN_VALUE("AsyncIterable", `transform[${i - 1}]`, ret);
}
} else {
var _ret;
if (!PassThrough) {
PassThrough = require_passthrough();
}
const pt = new PassThrough({
objectMode: true,
});
const then = (_ret = ret) === null || _ret === void 0 ? void 0 : _ret.then;
if (typeof then === "function") {
finishCount++;
then.$call(
ret,
val => {
value = val;
if (val != null) {
pt.write(val);
}
if (end) {
pt.end();
}
ProcessNextTick(finish);
},
err => {
pt.destroy(err);
ProcessNextTick(finish, err);
},
);
} else if (isIterable(ret, true)) {
finishCount++;
pump(ret, pt, finish, {
end,
});
} else {
throw new ERR_INVALID_RETURN_VALUE("AsyncIterable or Promise", "destination", ret);
}
ret = pt;
const { destroy, cleanup } = destroyer(ret, false, true);
destroys.push(destroy);
if (isLastStream) {
lastStreamCleanup.push(cleanup);
}
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount += 2;
const cleanup = pipe(ret, stream, finish, {
end,
});
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, {
end,
});
} else {
throw new ERR_INVALID_ARG_TYPE("val", ["Readable", "Iterable", "AsyncIterable"], ret);
}
ret = stream;
} else {
ret = Duplex.from(stream);
}
}
if (
(signal !== null && signal !== void 0 && signal.aborted) ||
(outerSignal !== null && outerSignal !== void 0 && outerSignal.aborted)
) {
ProcessNextTick(abort);
}
return ret;
}