in src/js/node/stream.ts [2955:3101]
Readable.prototype.read = function (n) {
$debug("read - n =", n, this.__id);
if (!NumberIsInteger(n)) {
n = NumberParseInt(n, 10);
}
const state = this._readableState;
const nOrig = n;
// If we're asking for more than the current hwm, then raise the hwm.
if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);
if (n !== 0) state.emittedReadable = false;
// If we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
// the 'readable' event and move on.
if (
n === 0 &&
state.needReadable &&
((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || state.ended)
) {
$debug("read: emitReadable or endReadable", state.length, state.ended, this.__id);
if (state.length === 0 && state.ended) endReadable(this);
else emitReadable(this);
return null;
}
n = howMuchToRead(n, state);
// If we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
$debug("read: calling endReadable if length 0 -- length, state.ended", state.length, state.ended, this.__id);
if (state.length === 0) endReadable(this);
return null;
}
// All the actual chunk generation logic needs to be
// *below* the call to _read. The reason is that in certain
// synthetic stream cases, such as passthrough streams, _read
// may be a completely synchronous operation which may change
// the state of the read buffer, providing enough data when
// before there was *not* enough.
//
// So, the steps are:
// 1. Figure out what the state of things will be after we do
// a read from the buffer.
//
// 2. If that resulting state will trigger a _read, then call _read.
// Note that this may be asynchronous, or synchronous. Yes, it is
// deeply ugly to write APIs this way, but that still doesn't mean
// that the Readable class should behave improperly, as streams are
// designed to be sync/async agnostic.
// Take note if the _read call is sync or async (ie, if the read call
// has returned yet), so that we know whether or not it's safe to emit
// 'readable' etc.
//
// 3. Actually pull the requested chunks out of the buffer and return.
// if we need a readable event, then we need to do some reading.
let doRead = state.needReadable;
$debug("need readable", doRead, this.__id);
// If we currently have less than the highWaterMark, then also read some.
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
$debug("length less than watermark", doRead, this.__id);
}
// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, if we're constructing we have to wait,
// and if we're destroyed or errored, then it's not allowed,
if (state.ended || state.reading || state.destroyed || state.errored || !state.constructed) {
$debug("state.constructed?", state.constructed, this.__id);
doRead = false;
$debug("reading, ended or constructing", doRead, this.__id);
} else if (doRead) {
$debug("do read", this.__id);
state.reading = true;
state.sync = true;
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0) state.needReadable = true;
// Call internal read method
try {
var result = this._read(state.highWaterMark);
if ($isPromise(result)) {
$debug("async _read", this.__id);
const peeked = Bun.peek(result);
$debug("peeked promise", peeked, this.__id);
if (peeked !== result) {
result = peeked;
}
}
if ($isPromise(result) && result?.then && $isCallable(result.then)) {
$debug("async _read result.then setup", this.__id);
result.then(nop, function (err) {
errorOrDestroy(this, err);
});
}
} catch (err) {
errorOrDestroy(this, err);
}
state.sync = false;
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
if (!state.reading) n = howMuchToRead(nOrig, state);
}
$debug("n @ fromList", n, this.__id);
let ret;
if (n > 0) ret = fromList(n, state);
else ret = null;
$debug("ret @ read", ret, this.__id);
if (ret === null) {
state.needReadable = state.length <= state.highWaterMark;
$debug("state.length while ret = null", state.length, this.__id);
n = 0;
} else {
state.length -= n;
if (state.multiAwaitDrain) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
}
}
$debug("length", state.length, state.ended, nOrig, n);
if (state.length === 0) {
// If we have nothing in the buffer, then we want to know
// as soon as we *do* get something into the buffer.
if (!state.ended) state.needReadable = true;
// If we tried to read() past the EOF, then emit end on the next tick.
if (nOrig !== n && state.ended) endReadable(this);
}
if (ret !== null && !state.errorEmitted && !state.closeEmitted) {
state.dataEmitted = true;
this.emit("data", ret);
}
return ret;
};