Readable.prototype.read = function()

in src/js/node/stream.ts [2955:3101]


    Readable.prototype.read = function (n) {
      $debug("read - n =", n, this.__id);
      if (!NumberIsInteger(n)) {
        n = NumberParseInt(n, 10);
      }
      const state = this._readableState;
      const nOrig = n;

      // If we're asking for more than the current hwm, then raise the hwm.
      if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);

      if (n !== 0) state.emittedReadable = false;

      // If we're doing read(0) to trigger a readable event, but we
      // already have a bunch of data in the buffer, then just trigger
      // the 'readable' event and move on.
      if (
        n === 0 &&
        state.needReadable &&
        ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || state.ended)
      ) {
        $debug("read: emitReadable or endReadable", state.length, state.ended, this.__id);
        if (state.length === 0 && state.ended) endReadable(this);
        else emitReadable(this);
        return null;
      }

      n = howMuchToRead(n, state);

      // If we've ended, and we're now clear, then finish it up.
      if (n === 0 && state.ended) {
        $debug("read: calling endReadable if length 0 -- length, state.ended", state.length, state.ended, this.__id);
        if (state.length === 0) endReadable(this);
        return null;
      }

      // All the actual chunk generation logic needs to be
      // *below* the call to _read.  The reason is that in certain
      // synthetic stream cases, such as passthrough streams, _read
      // may be a completely synchronous operation which may change
      // the state of the read buffer, providing enough data when
      // before there was *not* enough.
      //
      // So, the steps are:
      // 1. Figure out what the state of things will be after we do
      // a read from the buffer.
      //
      // 2. If that resulting state will trigger a _read, then call _read.
      // Note that this may be asynchronous, or synchronous.  Yes, it is
      // deeply ugly to write APIs this way, but that still doesn't mean
      // that the Readable class should behave improperly, as streams are
      // designed to be sync/async agnostic.
      // Take note if the _read call is sync or async (ie, if the read call
      // has returned yet), so that we know whether or not it's safe to emit
      // 'readable' etc.
      //
      // 3. Actually pull the requested chunks out of the buffer and return.

      // if we need a readable event, then we need to do some reading.
      let doRead = state.needReadable;
      $debug("need readable", doRead, this.__id);

      // If we currently have less than the highWaterMark, then also read some.
      if (state.length === 0 || state.length - n < state.highWaterMark) {
        doRead = true;
        $debug("length less than watermark", doRead, this.__id);
      }

      // However, if we've ended, then there's no point, if we're already
      // reading, then it's unnecessary, if we're constructing we have to wait,
      // and if we're destroyed or errored, then it's not allowed,
      if (state.ended || state.reading || state.destroyed || state.errored || !state.constructed) {
        $debug("state.constructed?", state.constructed, this.__id);
        doRead = false;
        $debug("reading, ended or constructing", doRead, this.__id);
      } else if (doRead) {
        $debug("do read", this.__id);
        state.reading = true;
        state.sync = true;
        // If the length is currently zero, then we *need* a readable event.
        if (state.length === 0) state.needReadable = true;

        // Call internal read method
        try {
          var result = this._read(state.highWaterMark);
          if ($isPromise(result)) {
            $debug("async _read", this.__id);
            const peeked = Bun.peek(result);
            $debug("peeked promise", peeked, this.__id);
            if (peeked !== result) {
              result = peeked;
            }
          }

          if ($isPromise(result) && result?.then && $isCallable(result.then)) {
            $debug("async _read result.then setup", this.__id);
            result.then(nop, function (err) {
              errorOrDestroy(this, err);
            });
          }
        } catch (err) {
          errorOrDestroy(this, err);
        }

        state.sync = false;
        // If _read pushed data synchronously, then `reading` will be false,
        // and we need to re-evaluate how much data we can return to the user.
        if (!state.reading) n = howMuchToRead(nOrig, state);
      }

      $debug("n @ fromList", n, this.__id);
      let ret;
      if (n > 0) ret = fromList(n, state);
      else ret = null;

      $debug("ret @ read", ret, this.__id);

      if (ret === null) {
        state.needReadable = state.length <= state.highWaterMark;
        $debug("state.length while ret = null", state.length, this.__id);
        n = 0;
      } else {
        state.length -= n;
        if (state.multiAwaitDrain) {
          state.awaitDrainWriters.clear();
        } else {
          state.awaitDrainWriters = null;
        }
      }

      $debug("length", state.length, state.ended, nOrig, n);
      if (state.length === 0) {
        // If we have nothing in the buffer, then we want to know
        // as soon as we *do* get something into the buffer.
        if (!state.ended) state.needReadable = true;

        // If we tried to read() past the EOF, then emit end on the next tick.
        if (nOrig !== n && state.ended) endReadable(this);
      }

      if (ret !== null && !state.errorEmitted && !state.closeEmitted) {
        state.dataEmitted = true;
        this.emit("data", ret);
      }

      return ret;
    };