bool Pair::read()

in gloo/transport/tcp/pair.cc [524:607]


bool Pair::read() {
  if (state_ == CLOSED) {
    return false;
  }
  NonOwningPtr<UnboundBuffer> buf;
  auto start = std::chrono::steady_clock::now();

  for (;;) {
    struct iovec iov = {
        .iov_base = nullptr,
        .iov_len = 0,
    };
    const auto nbytes = prepareRead(rx_, buf, iov);
    if (nbytes < 0) {
      return false;
    }

    // Break from loop if the op is complete.
    // Note that this means that the buffer pointer has been
    // set, per the call to prepareRead.
    if (nbytes == 0) {
      break;
    }

    // If busy-poll has been requested AND sync mode has been enabled for pair
    // we'll keep spinning calling recv() on socket by supplying MSG_DONTWAIT
    // flag. This is more efficient in terms of latency than allowing the kernel
    // to de-schedule this thread waiting for IO event to happen. The tradeoff
    // is stealing the CPU core just for busy polling.
    ssize_t rv = 0;
    for (;;) {
      // Alas, readv does not support flags, so we need to use recv
      rv = ::recv(fd_, iov.iov_base, iov.iov_len, busyPoll_ ? MSG_DONTWAIT : 0);
      if (rv == -1) {
        // EAGAIN happens when (1) non-blocking and there are no more bytes left
        // to read or (2) blocking and timeout occurs.
        if (errno == EAGAIN) {
          if (sync_) {
            // Sync mode: EAGAIN indicates nothing to read right now.
            auto hasTimedOut = [&] {
              return (timeout_ != kNoTimeout) &&
                  ((std::chrono::steady_clock::now() - start) >= timeout_);
            };
            if (busyPoll_ && !hasTimedOut()) {
              // Keep looping on EAGAIN if busy-poll flag has been set and the
              // timeout (if set) hasn't been reached
              continue;
            } else {
              // Either timeout on poll or blocking call returning with EAGAIN
              // indicates timeout
              signalException(GLOO_ERROR_MSG("Read timeout ", peer_.str()));
            }
          } else {
            // Async mode: can't read more than this.
          }
          return false;
        }

        // Retry on EINTR
        if (errno == EINTR) {
          continue;
        }

        // Unexpected error
        signalException(
            GLOO_ERROR_MSG("Read error ", peer_.str(), ": ", strerror(errno)));
        return false;
      }
      break;
    }

    // Transition to CLOSED on EOF
    if (rv == 0) {
      signalException(
          GLOO_ERROR_MSG("Connection closed by peer ", peer_.str()));
      return false;
    }

    rx_.nread += rv;
  }

  readComplete(buf);
  return true;
}