tools/node-hermes/nodelib/internal/stream_base_commons.js (203 lines of code) (raw):
'use strict';
var _primordials = primordials,
Array = _primordials.Array,
_Symbol = _primordials.Symbol;
var _require = require('buffer'),
Buffer = _require.Buffer;
var _require2 = require('internal/buffer'),
FastBuffer = _require2.FastBuffer;
// var _internalBinding = internalBinding('stream_wrap'),
// WriteWrap = _internalBinding.WriteWrap;
// kReadBytesOrError = _internalBinding.kReadBytesOrError,
// kArrayBufferOffset = _internalBinding.kArrayBufferOffset,
// kBytesWritten = _internalBinding.kBytesWritten,
// kLastWriteWasAsync = _internalBinding.kLastWriteWasAsync,
// streamBaseState = _internalBinding.streamBaseState;
// var _internalBinding2 = internalBinding('uv'),
// UV_EOF = _internalBinding2.UV_EOF;
var _require3 = require('internal/errors'),
errnoException = _require3.errnoException;
// var owner_symbol = require('internal/async_hooks').symbols.owner_symbol;
// var _require4 = require('internal/timers'),
// kTimeout = _require4.kTimeout,
// setUnrefTimeout = _require4.setUnrefTimeout,
// getTimerDuration = _require4.getTimerDuration;
var _require5 = require('internal/util/types'),
isUint8Array = _require5.isUint8Array;
// var _require6 = require('timers'),
// clearTimeout = _require6.clearTimeout;
var _require7 = require('internal/validators'),
validateCallback = _require7.validateCallback;
var kMaybeDestroy = _Symbol('kMaybeDestroy');
var kUpdateTimer = _Symbol('kUpdateTimer');
var kAfterAsyncWrite = _Symbol('kAfterAsyncWrite');
var kHandle = _Symbol('kHandle');
var kSession = _Symbol('kSession');
var debug = require('internal/util/debuglog').debuglog('stream', function (fn) {
debug = fn;
});
var kBuffer = _Symbol('kBuffer');
var kBufferGen = _Symbol('kBufferGen');
var kBufferCb = _Symbol('kBufferCb');
function handleWriteReq(req, data, encoding) {
var handle = req.handle;
switch (encoding) {
case 'buffer':
{
var ret = handle.writeBuffer(req, data);
if (streamBaseState[kLastWriteWasAsync]) req.buffer = data;
return ret;
}
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);
case 'utf8':
case 'utf-8':
return handle.writeUtf8String(req, data);
case 'ascii':
return handle.writeAsciiString(req, data);
case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
return handle.writeUcs2String(req, data);
default:
{
var buffer = Buffer.from(data, encoding);
var _ret = handle.writeBuffer(req, buffer);
if (streamBaseState[kLastWriteWasAsync]) req.buffer = buffer;
return _ret;
}
}
}
function onWriteComplete(status) {
debug('onWriteComplete', status, this.error);
var stream = this.handle[owner_symbol];
if (stream.destroyed) {
if (typeof this.callback === 'function') this.callback(null);
return;
} // TODO (ronag): This should be moved before if(stream.destroyed)
// in order to avoid swallowing error.
if (status < 0) {
var ex = errnoException(status, 'write', this.error);
if (typeof this.callback === 'function') this.callback(ex);else stream.destroy(ex);
return;
}
stream[kUpdateTimer]();
stream[kAfterAsyncWrite](this);
if (typeof this.callback === 'function') this.callback(null);
}
function createWriteWrap(handle, callback) {
var req = {}; //new WriteWrap();
req.handle = handle;
req.oncomplete = onWriteComplete;
req.async = false;
req.bytes = 0;
req.buffer = null;
req.callback = callback;
return req;
}
function writevGeneric(self, data, cb) {
var req = createWriteWrap(self[kHandle], cb);
var allBuffers = data.allBuffers;
var chunks;
if (allBuffers) {
chunks = data;
for (var i = 0; i < data.length; i++) {
data[i] = data[i].chunk;
}
} else {
chunks = new Array(data.length << 1);
for (var _i = 0; _i < data.length; _i++) {
var entry = data[_i];
chunks[_i * 2] = entry.chunk;
chunks[_i * 2 + 1] = entry.encoding;
}
}
var err = req.handle.writev(req, chunks, allBuffers); // Retain chunks
if (err === 0) req._chunks = chunks;
afterWriteDispatched(req, err, cb);
return req;
}
function writeGeneric(self, data, encoding, cb) {
var req = createWriteWrap(self[kHandle], cb);
var err = handleWriteReq(req, data, encoding);
afterWriteDispatched(req, err, cb);
return req;
}
function afterWriteDispatched(req, err, cb) {
// req.bytes = streamBaseState[kBytesWritten];
req.async = false; //!!streamBaseState[kLastWriteWasAsync];
if (err !== 0) return cb(errnoException(err, 'write', req.error));
if (!req.async && typeof req.callback === 'function') {
req.callback();
}
}
function onStreamRead(arrayBuffer) {
var nread = streamBaseState[kReadBytesOrError];
var handle = this;
var stream = this[owner_symbol];
stream[kUpdateTimer]();
if (nread > 0 && !stream.destroyed) {
var ret;
var result;
var userBuf = stream[kBuffer];
if (userBuf) {
result = stream[kBufferCb](nread, userBuf) !== false;
var bufGen = stream[kBufferGen];
if (bufGen !== null) {
var nextBuf = bufGen();
if (isUint8Array(nextBuf)) stream[kBuffer] = ret = nextBuf;
}
} else {
var offset = streamBaseState[kArrayBufferOffset];
var buf = new FastBuffer(arrayBuffer, offset, nread);
result = stream.push(buf);
}
if (!result) {
handle.reading = false;
if (!stream.destroyed) {
var err = handle.readStop();
if (err) stream.destroy(errnoException(err, 'read'));
}
}
return ret;
}
if (nread === 0) {
return;
}
if (nread !== UV_EOF) {
// CallJSOnreadMethod expects the return value to be a buffer.
// Ref: https://github.com/nodejs/node/pull/34375
stream.destroy(errnoException(nread, 'read'));
return;
} // Defer this until we actually emit end
if (stream._readableState.endEmitted) {
if (stream[kMaybeDestroy]) stream[kMaybeDestroy]();
} else {
if (stream[kMaybeDestroy]) stream.on('end', stream[kMaybeDestroy]); // TODO(ronag): Without this `readStop`, `onStreamRead`
// will be called once more (i.e. after Readable.ended)
// on Windows causing a ECONNRESET, failing the
// test-https-truncate test.
if (handle.readStop) {
var _err = handle.readStop();
if (_err) {
// CallJSOnreadMethod expects the return value to be a buffer.
// Ref: https://github.com/nodejs/node/pull/34375
stream.destroy(errnoException(_err, 'read'));
return;
}
} // Push a null to signal the end of data.
// Do it before `maybeDestroy` for correct order of events:
// `end` -> `close`
stream.push(null);
stream.read(0);
}
}
function setStreamTimeout(msecs, callback) {
if (this.destroyed) return this;
this.timeout = msecs; // Type checking identical to timers.enroll()
msecs = getTimerDuration(msecs, 'msecs'); // Attempt to clear an existing timer in both cases -
// even if it will be rescheduled we don't want to leak an existing timer.
clearTimeout(this[kTimeout]);
if (msecs === 0) {
if (callback !== undefined) {
validateCallback(callback);
this.removeListener('timeout', callback);
}
} else {
this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs);
if (this[kSession]) this[kSession][kUpdateTimer]();
if (callback !== undefined) {
validateCallback(callback);
this.once('timeout', callback);
}
}
return this;
}
module.exports = {
writevGeneric: writevGeneric,
writeGeneric: writeGeneric,
onStreamRead: onStreamRead,
kAfterAsyncWrite: kAfterAsyncWrite,
kMaybeDestroy: kMaybeDestroy,
kUpdateTimer: kUpdateTimer,
kHandle: kHandle,
kSession: kSession,
setStreamTimeout: setStreamTimeout,
kBuffer: kBuffer,
kBufferCb: kBufferCb,
kBufferGen: kBufferGen
};