in src/js/node/stream.ts [1141:1307]
function eos(stream, options, callback) {
var _options$readable, _options$writable;
if (arguments.length === 2) {
callback = options;
options = {};
} else if (options == null) {
options = {};
} else {
validateObject(options, "options");
}
validateFunction(callback, "callback");
validateAbortSignal(options.signal, "options.signal");
callback = once(callback);
const readable =
(_options$readable = options.readable) !== null && _options$readable !== void 0
? _options$readable
: isReadableNodeStream(stream);
const writable =
(_options$writable = options.writable) !== null && _options$writable !== void 0
? _options$writable
: isWritableNodeStream(stream);
if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE("stream", "Stream", stream);
}
const wState = stream._writableState;
const rState = stream._readableState;
const onlegacyfinish = () => {
if (!stream.writable) {
onfinish();
}
};
let willEmitClose =
_willEmitClose(stream) &&
isReadableNodeStream(stream) === readable &&
isWritableNodeStream(stream) === writable;
let writableFinished = isWritableFinished(stream, false);
const onfinish = () => {
writableFinished = true;
if (stream.destroyed) {
willEmitClose = false;
}
if (willEmitClose && (!stream.readable || readable)) {
return;
}
if (!readable || readableFinished) {
callback.$call(stream);
}
};
let readableFinished = isReadableFinished(stream, false);
const onend = () => {
readableFinished = true;
if (stream.destroyed) {
willEmitClose = false;
}
if (willEmitClose && (!stream.writable || writable)) {
return;
}
if (!writable || writableFinished) {
callback.$call(stream);
}
};
const onerror = err => {
callback.$call(stream, err);
};
let closed = isClosed(stream);
const onclose = () => {
closed = true;
const errored = isWritableErrored(stream) || isReadableErrored(stream);
if (errored && typeof errored !== "boolean") {
return callback.$call(stream, errored);
}
if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
if (!isReadableFinished(stream, false)) return callback.$call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream, false)) return callback.$call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
callback.$call(stream);
};
const onrequest = () => {
stream.req.on("finish", onfinish);
};
if (isRequest(stream)) {
stream.on("complete", onfinish);
if (!willEmitClose) {
stream.on("abort", onclose);
}
if (stream.req) {
onrequest();
} else {
stream.on("request", onrequest);
}
} else if (writable && !wState) {
stream.on("end", onlegacyfinish);
stream.on("close", onlegacyfinish);
}
if (!willEmitClose && typeof stream.aborted === "boolean") {
stream.on("aborted", onclose);
}
stream.on("end", onend);
stream.on("finish", onfinish);
if (options.error !== false) {
stream.on("error", onerror);
}
stream.on("close", onclose);
if (closed) {
ProcessNextTick(onclose);
} else if (
(wState !== null && wState !== void 0 && wState.errorEmitted) ||
(rState !== null && rState !== void 0 && rState.errorEmitted)
) {
if (!willEmitClose) {
ProcessNextTick(onclose);
}
} else if (
!readable &&
(!willEmitClose || isReadable(stream)) &&
(writableFinished || isWritable(stream) === false)
) {
ProcessNextTick(onclose);
} else if (
!writable &&
(!willEmitClose || isWritable(stream)) &&
(readableFinished || isReadable(stream) === false)
) {
ProcessNextTick(onclose);
} else if (rState && stream.req && stream.aborted) {
ProcessNextTick(onclose);
}
const cleanup = () => {
callback = nop;
stream.removeListener("aborted", onclose);
stream.removeListener("complete", onfinish);
stream.removeListener("abort", onclose);
stream.removeListener("request", onrequest);
if (stream.req) stream.req.removeListener("finish", onfinish);
stream.removeListener("end", onlegacyfinish);
stream.removeListener("close", onlegacyfinish);
stream.removeListener("finish", onfinish);
stream.removeListener("end", onend);
stream.removeListener("error", onerror);
stream.removeListener("close", onclose);
};
if (options.signal && !closed) {
const abort = () => {
const endCallback = callback;
cleanup();
endCallback.$call(
stream,
new AbortError(void 0, {
cause: options.signal.reason,
}),
);
};
if (options.signal.aborted) {
ProcessNextTick(abort);
} else {
const originalCallback = callback;
callback = once((...args) => {
options.signal.removeEventListener("abort", abort);
originalCallback.$apply(stream, args);
});
options.signal.addEventListener("abort", abort);
}
}
return cleanup;
}