in tensorpipe/common/ringbuffer_read_write_ops.h [138:193]
size_t RingbufferReadOperation::handleRead(
RingBufferRole<NumRoles, RoleIdx>& inbox) {
ssize_t ret;
size_t bytesReadNow = 0;
// Start read transaction. This end of the connection is the only consumer for
// this ringbuffer, and all reads are done from the reactor thread, so there
// cannot be another transaction already going on. Fail hard in case.
ret = inbox.startTx();
TP_THROW_SYSTEM_IF(ret < 0, -ret);
if (mode_ == READ_LENGTH) {
uint32_t length;
ret = inbox.template readInTx</*AllowPartial=*/false>(
&length, sizeof(length));
if (likely(ret >= 0)) {
mode_ = READ_PAYLOAD;
bytesReadNow += ret;
if (nopObject_ != nullptr) {
len_ = length;
} else if (ptrProvided_) {
TP_DCHECK_EQ(length, len_);
} else {
len_ = length;
buf_ = std::make_unique<uint8_t[]>(len_);
ptr_ = buf_.get();
}
} else if (unlikely(ret != -ENODATA)) {
TP_THROW_SYSTEM(-ret);
}
}
if (mode_ == READ_PAYLOAD) {
if (nopObject_ != nullptr) {
ret = readNopObject(inbox);
} else {
ret = inbox.template readInTx</*AllowPartial=*/true>(
reinterpret_cast<uint8_t*>(ptr_) + bytesRead_, len_ - bytesRead_);
}
if (likely(ret >= 0)) {
bytesRead_ += ret;
bytesReadNow += ret;
} else if (unlikely(ret != -ENODATA)) {
TP_THROW_SYSTEM(-ret);
}
}
ret = inbox.commitTx();
TP_THROW_SYSTEM_IF(ret < 0, -ret);
if (completed()) {
fn_(Error::kSuccess, ptr_, len_);
}
return bytesReadNow;
}