tools/node-hermes/nodelib/internal/streams/end-of-stream.js (146 lines of code) (raw):

// Ported from https://github.com/mafintosh/end-of-stream with // permission from the author, Mathias Buus (@mafintosh). 'use strict'; var _require = require('internal/errors'), AbortError = _require.AbortError, codes = _require.codes; var ERR_STREAM_PREMATURE_CLOSE = codes.ERR_STREAM_PREMATURE_CLOSE; var _require2 = require('internal/util'), once = _require2.once; var _require3 = require('internal/validators'), validateAbortSignal = _require3.validateAbortSignal, validateFunction = _require3.validateFunction, validateObject = _require3.validateObject; var _require4 = require('internal/streams/utils'), isClosed = _require4.isClosed, isReadable = _require4.isReadable, isReadableNodeStream = _require4.isReadableNodeStream, isReadableFinished = _require4.isReadableFinished, isWritable = _require4.isWritable, isWritableNodeStream = _require4.isWritableNodeStream, isWritableFinished = _require4.isWritableFinished, _willEmitClose = _require4.willEmitClose; function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } var nop = function nop() {}; function eos(stream, options, callback) { if (arguments.length === 2) { callback = options; options = {}; } else if (options == null) { options = {}; } else { validateObject(options, 'options'); } validateFunction(callback, 'callback'); validateAbortSignal(options.signal, 'options.signal'); callback = once(callback); var readable = options.readable || options.readable !== false && isReadableNodeStream(stream); var writable = options.writable || options.writable !== false && isWritableNodeStream(stream); var wState = stream._writableState; var rState = stream._readableState; var onlegacyfinish = function onlegacyfinish() { if (!stream.writable) onfinish(); }; // TODO (ronag): Improve soft detection to include core modules and // common ecosystem modules that do properly emit 'close' but fail // this generic check. var willEmitClose = _willEmitClose(stream) && isReadableNodeStream(stream) === readable && isWritableNodeStream(stream) === writable; var writableFinished = isWritableFinished(stream, false); var onfinish = function onfinish() { writableFinished = true; // Stream should not be destroyed here. If it is that // means that user space is doing something differently and // we cannot trust willEmitClose. if (stream.destroyed) willEmitClose = false; if (willEmitClose && (!stream.readable || readable)) return; if (!readable || readableFinished) callback.call(stream); }; var readableFinished = isReadableFinished(stream, false); var onend = function onend() { readableFinished = true; // Stream should not be destroyed here. If it is that // means that user space is doing something differently and // we cannot trust willEmitClose. if (stream.destroyed) willEmitClose = false; if (willEmitClose && (!stream.writable || writable)) return; if (!writable || writableFinished) callback.call(stream); }; var onerror = function onerror(err) { callback.call(stream, err); }; var closed = isClosed(stream); var onclose = function onclose() { closed = true; var errored = (wState === null || wState === void 0 ? void 0 : wState.errored) || (rState === null || rState === void 0 ? void 0 : rState.errored); if (errored && typeof errored !== 'boolean') { return callback.call(stream, errored); } if (readable && !readableFinished) { if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); } if (writable && !writableFinished) { if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); } callback.call(stream); }; var onrequest = function onrequest() { stream.req.on('finish', onfinish); }; if (isRequest(stream)) { stream.on('complete', onfinish); if (!willEmitClose) { stream.on('abort', onclose); } if (stream.req) onrequest();else stream.on('request', onrequest); } else if (writable && !wState) { // legacy streams stream.on('end', onlegacyfinish); stream.on('close', onlegacyfinish); } // Not all streams will emit 'close' after 'aborted'. if (!willEmitClose && typeof stream.aborted === 'boolean') { stream.on('aborted', onclose); } stream.on('end', onend); stream.on('finish', onfinish); if (options.error !== false) stream.on('error', onerror); stream.on('close', onclose); if (closed) { process.nextTick(onclose); } else if (wState !== null && wState !== void 0 && wState.errorEmitted || rState !== null && rState !== void 0 && rState.errorEmitted) { if (!willEmitClose) { process.nextTick(onclose); } } else if (!readable && (!willEmitClose || isReadable(stream)) && (writableFinished || !isWritable(stream))) { process.nextTick(onclose); } else if (!writable && (!willEmitClose || isWritable(stream)) && (readableFinished || !isReadable(stream))) { process.nextTick(onclose); } else if (rState && stream.req && stream.aborted) { process.nextTick(onclose); } var cleanup = function cleanup() { callback = nop; stream.removeListener('aborted', onclose); stream.removeListener('complete', onfinish); stream.removeListener('abort', onclose); stream.removeListener('request', onrequest); if (stream.req) stream.req.removeListener('finish', onfinish); stream.removeListener('end', onlegacyfinish); stream.removeListener('close', onlegacyfinish); stream.removeListener('finish', onfinish); stream.removeListener('end', onend); stream.removeListener('error', onerror); stream.removeListener('close', onclose); }; if (options.signal && !closed) { var abort = function abort() { // Keep it because cleanup removes it. var endCallback = callback; cleanup(); endCallback.call(stream, new AbortError()); }; if (options.signal.aborted) { process.nextTick(abort); } else { var originalCallback = callback; callback = once(function () { options.signal.removeEventListener('abort', abort); for (var _len = arguments.length, args = new Array(_len), _key = 0; _key < _len; _key++) { args[_key] = arguments[_key]; } originalCallback.apply(stream, args); }); options.signal.addEventListener('abort', abort); } } return cleanup; } module.exports = eos;