in gloo/transport/tcp/pair.cc [524:607]
bool Pair::read() {
if (state_ == CLOSED) {
return false;
}
NonOwningPtr<UnboundBuffer> buf;
auto start = std::chrono::steady_clock::now();
for (;;) {
struct iovec iov = {
.iov_base = nullptr,
.iov_len = 0,
};
const auto nbytes = prepareRead(rx_, buf, iov);
if (nbytes < 0) {
return false;
}
// Break from loop if the op is complete.
// Note that this means that the buffer pointer has been
// set, per the call to prepareRead.
if (nbytes == 0) {
break;
}
// If busy-poll has been requested AND sync mode has been enabled for pair
// we'll keep spinning calling recv() on socket by supplying MSG_DONTWAIT
// flag. This is more efficient in terms of latency than allowing the kernel
// to de-schedule this thread waiting for IO event to happen. The tradeoff
// is stealing the CPU core just for busy polling.
ssize_t rv = 0;
for (;;) {
// Alas, readv does not support flags, so we need to use recv
rv = ::recv(fd_, iov.iov_base, iov.iov_len, busyPoll_ ? MSG_DONTWAIT : 0);
if (rv == -1) {
// EAGAIN happens when (1) non-blocking and there are no more bytes left
// to read or (2) blocking and timeout occurs.
if (errno == EAGAIN) {
if (sync_) {
// Sync mode: EAGAIN indicates nothing to read right now.
auto hasTimedOut = [&] {
return (timeout_ != kNoTimeout) &&
((std::chrono::steady_clock::now() - start) >= timeout_);
};
if (busyPoll_ && !hasTimedOut()) {
// Keep looping on EAGAIN if busy-poll flag has been set and the
// timeout (if set) hasn't been reached
continue;
} else {
// Either timeout on poll or blocking call returning with EAGAIN
// indicates timeout
signalException(GLOO_ERROR_MSG("Read timeout ", peer_.str()));
}
} else {
// Async mode: can't read more than this.
}
return false;
}
// Retry on EINTR
if (errno == EINTR) {
continue;
}
// Unexpected error
signalException(
GLOO_ERROR_MSG("Read error ", peer_.str(), ": ", strerror(errno)));
return false;
}
break;
}
// Transition to CLOSED on EOF
if (rv == 0) {
signalException(
GLOO_ERROR_MSG("Connection closed by peer ", peer_.str()));
return false;
}
rx_.nread += rv;
}
readComplete(buf);
return true;
}