tools/node-hermes/nodelib/internal/fs/streams.js (384 lines of code) (raw):

// @nolint 'use strict'; function _typeof(obj) { "@babel/helpers - typeof"; if (typeof Symbol === "function" && typeof Symbol.iterator === "symbol") { _typeof = function _typeof(obj) { return typeof obj; }; } else { _typeof = function _typeof(obj) { return obj && typeof Symbol === "function" && obj.constructor === Symbol && obj !== Symbol.prototype ? "symbol" : typeof obj; }; } return _typeof(obj); } var _primordials = primordials, Array = _primordials.Array, FunctionPrototypeBind = _primordials.FunctionPrototypeBind, MathMin = _primordials.MathMin, ObjectDefineProperty = _primordials.ObjectDefineProperty, ObjectSetPrototypeOf = _primordials.ObjectSetPrototypeOf, PromisePrototypeThen = _primordials.PromisePrototypeThen, ReflectApply = _primordials.ReflectApply, _Symbol = _primordials.Symbol; var _require$codes = require('internal/errors').codes, ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE, ERR_OUT_OF_RANGE = _require$codes.ERR_OUT_OF_RANGE, ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED; var _require = require('internal/util'), deprecate = _require.deprecate; var _require2 = require('internal/validators'), validateFunction = _require2.validateFunction, validateInteger = _require2.validateInteger; var _require3 = require('internal/streams/destroy'), errorOrDestroy = _require3.errorOrDestroy; var fs = require('fs'); var _require4 = require('internal/fs/promises'), kRef = _require4.kRef, kUnref = _require4.kUnref, FileHandle = _require4.FileHandle; var _require5 = require('buffer'), Buffer = _require5.Buffer; var _require6 = require('internal/fs/utils'), copyObject = _require6.copyObject, getOptions = _require6.getOptions, getValidatedFd = _require6.getValidatedFd, validatePath = _require6.validatePath; var _require7 = require('stream'), Readable = _require7.Readable, Writable = _require7.Writable, finished = _require7.finished; var _require8 = require('internal/url'), toPathIfFileURL = _require8.toPathIfFileURL; var kIoDone = _Symbol('kIoDone'); var kIsPerformingIO = _Symbol('kIsPerformingIO'); var kFs = _Symbol('kFs'); var kHandle = _Symbol('kHandle'); function _construct(callback) { var stream = this; if (typeof stream.fd === 'number') { callback(); return; } if (stream.open !== openWriteFs && stream.open !== openReadFs) { // Backwards compat for monkey patching open(). var orgEmit = stream.emit; stream.emit = function () { for (var _len = arguments.length, args = new Array(_len), _key = 0; _key < _len; _key++) { args[_key] = arguments[_key]; } if (args[0] === 'open') { this.emit = orgEmit; callback(); ReflectApply(orgEmit, this, args); } else if (args[0] === 'error') { this.emit = orgEmit; callback(args[1]); } else { ReflectApply(orgEmit, this, args); } }; stream.open(); } else { stream[kFs].open(stream.path, stream.flags, stream.mode, function (er, fd) { if (er) { callback(er); } else { stream.fd = fd; callback(); stream.emit('open', stream.fd); stream.emit('ready'); } }); } } // This generates an fs operations structure for a FileHandle var FileHandleOperations = function FileHandleOperations(handle) { return { open: function open(path, flags, mode, cb) { throw new ERR_METHOD_NOT_IMPLEMENTED('open()'); }, close: function close(fd, cb) { handle[kUnref](); PromisePrototypeThen(handle.close(), function () { return cb(); }, cb); }, read: function read(fd, buf, offset, length, pos, cb) { PromisePrototypeThen(handle.read(buf, offset, length, pos), function (r) { return cb(null, r.bytesRead, r.buffer); }, function (err) { return cb(err, 0, buf); }); }, write: function write(fd, buf, offset, length, pos, cb) { PromisePrototypeThen(handle.write(buf, offset, length, pos), function (r) { return cb(null, r.bytesWritten, r.buffer); }, function (err) { return cb(err, 0, buf); }); }, writev: function writev(fd, buffers, pos, cb) { PromisePrototypeThen(handle.writev(buffers, pos), function (r) { return cb(null, r.bytesWritten, r.buffers); }, function (err) { return cb(err, 0, buffers); }); } }; }; function close(stream, err, cb) { if (!stream.fd) { // TODO(ronag) // stream.closed = true; cb(err); } else { stream[kFs].close(stream.fd, function (er) { stream.closed = true; cb(er || err); }); stream.fd = null; } } function importFd(stream, options) { stream.fd = null; if (options.fd != null) { if (typeof options.fd === 'number') { // When fd is a raw descriptor, we must keep our fingers crossed // that the descriptor won't get closed, or worse, replaced with // another one // https://github.com/nodejs/node/issues/35862 stream.fd = options.fd; } else if (_typeof(options.fd) === 'object' && options.fd instanceof FileHandle) { // When fd is a FileHandle we can listen for 'close' events if (options.fs) // FileHandle is not supported with custom fs operations throw new ERR_METHOD_NOT_IMPLEMENTED('FileHandle with fs'); stream[kHandle] = options.fd; stream.fd = options.fd.fd; stream[kFs] = FileHandleOperations(stream[kHandle]); stream[kHandle][kRef](); options.fd.on('close', FunctionPrototypeBind(stream.close, stream)); } else throw ERR_INVALID_ARG_TYPE('options.fd', ['number', 'FileHandle'], options.fd); } } function ReadStream(path, options) { if (!(this instanceof ReadStream)) return new ReadStream(path, options); // A little bit bigger buffer and water marks by default options = copyObject(getOptions(options, {})); if (options.highWaterMark === undefined) options.highWaterMark = 64 * 1024; if (options.autoDestroy === undefined) { options.autoDestroy = false; } this[kFs] = options.fs || fs; validateFunction(this[kFs].open, 'options.fs.open'); validateFunction(this[kFs].read, 'options.fs.read'); validateFunction(this[kFs].close, 'options.fs.close'); options.autoDestroy = options.autoClose === undefined ? true : options.autoClose; // Path will be ignored when fd is specified, so it can be falsy this.path = toPathIfFileURL(path); this.flags = options.flags === undefined ? 'r' : options.flags; this.mode = options.mode === undefined ? 438 : options.mode; importFd(this, options); this.start = options.start; this.end = options.end; this.pos = undefined; this.bytesRead = 0; this.closed = false; this[kIsPerformingIO] = false; if (this.start !== undefined) { validateInteger(this.start, 'start', 0); this.pos = this.start; } // If fd has been set, validate, otherwise validate path. if (this.fd != null) { this.fd = getValidatedFd(this.fd); } else { validatePath(this.path); } if (this.end === undefined) { this.end = Infinity; } else if (this.end !== Infinity) { validateInteger(this.end, 'end', 0); if (this.start !== undefined && this.start > this.end) { throw new ERR_OUT_OF_RANGE('start', "<= \"end\" (here: ".concat(this.end, ")"), this.start); } } ReflectApply(Readable, this, [options]); } ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype); ObjectSetPrototypeOf(ReadStream, Readable); ObjectDefineProperty(ReadStream.prototype, 'autoClose', { get: function get() { return this._readableState.autoDestroy; }, set: function set(val) { this._readableState.autoDestroy = val; } }); var openReadFs = deprecate(function () {// Noop. }, 'ReadStream.prototype.open() is deprecated', 'DEP0135'); ReadStream.prototype.open = openReadFs; ReadStream.prototype._construct = _construct; ReadStream.prototype._read = function (n) { var _this = this; n = this.pos !== undefined ? MathMin(this.end - this.pos + 1, n) : MathMin(this.end - this.bytesRead + 1, n); if (n <= 0) { this.push(null); return; } var buf = Buffer.allocUnsafeSlow(n); this[kIsPerformingIO] = true; this[kFs].read(this.fd, buf, 0, n, this.pos, function (er, bytesRead, buf) { _this[kIsPerformingIO] = false; // Tell ._destroy() that it's safe to close the fd now. if (_this.destroyed) { _this.emit(kIoDone, er); return; } if (er) { errorOrDestroy(_this, er); } else if (bytesRead > 0) { if (_this.pos !== undefined) { _this.pos += bytesRead; } _this.bytesRead += bytesRead; if (bytesRead !== buf.length) { // Slow path. Shrink to fit. // Copy instead of slice so that we don't retain // large backing buffer for small reads. var dst = Buffer.allocUnsafeSlow(bytesRead); buf.copy(dst, 0, 0, bytesRead); buf = dst; } _this.push(buf); } else { _this.push(null); } }); }; ReadStream.prototype._destroy = function (err, cb) { var _this2 = this; // Usually for async IO it is safe to close a file descriptor // even when there are pending operations. However, due to platform // differences file IO is implemented using synchronous operations // running in a thread pool. Therefore, file descriptors are not safe // to close while used in a pending read or write operation. Wait for // any pending IO (kIsPerformingIO) to complete (kIoDone). if (this[kIsPerformingIO]) { this.once(kIoDone, function (er) { return close(_this2, err || er, cb); }); } else { close(this, err, cb); } }; ReadStream.prototype.close = function (cb) { if (typeof cb === 'function') finished(this, cb); this.destroy(); }; ObjectDefineProperty(ReadStream.prototype, 'pending', { get: function get() { return this.fd === null; }, configurable: true }); function WriteStream(path, options) { if (!(this instanceof WriteStream)) return new WriteStream(path, options); options = copyObject(getOptions(options, {})); // Only buffers are supported. options.decodeStrings = true; this[kFs] = options.fs || fs; validateFunction(this[kFs].open, 'options.fs.open'); if (!this[kFs].write && !this[kFs].writev) { throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', this[kFs].write); } if (this[kFs].write) { validateFunction(this[kFs].write, 'options.fs.write'); } if (this[kFs].writev) { validateFunction(this[kFs].writev, 'options.fs.writev'); } validateFunction(this[kFs].close, 'options.fs.close'); // It's enough to override either, in which case only one will be used. if (!this[kFs].write) { this._write = null; } if (!this[kFs].writev) { this._writev = null; } options.autoDestroy = options.autoClose === undefined ? true : options.autoClose; // Path will be ignored when fd is specified, so it can be falsy this.path = toPathIfFileURL(path); this.flags = options.flags === undefined ? 'w' : options.flags; this.mode = options.mode === undefined ? 438 : options.mode; importFd(this, options); this.start = options.start; this.pos = undefined; this.bytesWritten = 0; this.closed = false; this[kIsPerformingIO] = false; // If fd has been set, validate, otherwise validate path. if (this.fd != null) { this.fd = getValidatedFd(this.fd); } else { validatePath(this.path); } if (this.start !== undefined) { validateInteger(this.start, 'start', 0); this.pos = this.start; } ReflectApply(Writable, this, [options]); if (options.encoding) this.setDefaultEncoding(options.encoding); } ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype); ObjectSetPrototypeOf(WriteStream, Writable); ObjectDefineProperty(WriteStream.prototype, 'autoClose', { get: function get() { return this._writableState.autoDestroy; }, set: function set(val) { this._writableState.autoDestroy = val; } }); var openWriteFs = deprecate(function () {// Noop. }, 'WriteStream.prototype.open() is deprecated', 'DEP0135'); WriteStream.prototype.open = openWriteFs; WriteStream.prototype._construct = _construct; WriteStream.prototype._write = function (data, encoding, cb) { var _this3 = this; this[kIsPerformingIO] = true; this[kFs].write(this.fd, data, 0, data.length, this.pos, function (er, bytes) { _this3[kIsPerformingIO] = false; if (_this3.destroyed) { // Tell ._destroy() that it's safe to close the fd now. cb(er); return _this3.emit(kIoDone, er); } if (er) { return cb(er); } _this3.bytesWritten += bytes; cb(); }); if (this.pos !== undefined) this.pos += data.length; }; WriteStream.prototype._writev = function (data, cb) { var _this4 = this; var len = data.length; var chunks = new Array(len); var size = 0; for (var i = 0; i < len; i++) { var chunk = data[i].chunk; chunks[i] = chunk; size += chunk.length; } this[kIsPerformingIO] = true; this[kFs].writev(this.fd, chunks, this.pos, function (er, bytes) { _this4[kIsPerformingIO] = false; if (_this4.destroyed) { // Tell ._destroy() that it's safe to close the fd now. cb(er); return _this4.emit(kIoDone, er); } if (er) { return cb(er); } _this4.bytesWritten += bytes; cb(); }); if (this.pos !== undefined) this.pos += size; }; WriteStream.prototype._destroy = function (err, cb) { var _this5 = this; // Usually for async IO it is safe to close a file descriptor // even when there are pending operations. However, due to platform // differences file IO is implemented using synchronous operations // running in a thread pool. Therefore, file descriptors are not safe // to close while used in a pending read or write operation. Wait for // any pending IO (kIsPerformingIO) to complete (kIoDone). if (this[kIsPerformingIO]) { this.once(kIoDone, function (er) { return close(_this5, err || er, cb); }); } else { close(this, err, cb); } }; WriteStream.prototype.close = function (cb) { if (cb) { if (this.closed) { process.nextTick(cb); return; } this.on('close', cb); } // If we are not autoClosing, we should call // destroy on 'finish'. if (!this.autoClose) { this.on('finish', this.destroy); } // We use end() instead of destroy() because of // https://github.com/nodejs/node/issues/2006 this.end(); }; // There is no shutdown() for files. WriteStream.prototype.destroySoon = WriteStream.prototype.end; ObjectDefineProperty(WriteStream.prototype, 'pending', { get: function get() { return this.fd === null; }, configurable: true }); module.exports = { ReadStream: ReadStream, WriteStream: WriteStream };