size_t RingbufferReadOperation::handleRead()

in tensorpipe/common/ringbuffer_read_write_ops.h [138:193]


size_t RingbufferReadOperation::handleRead(
    RingBufferRole<NumRoles, RoleIdx>& inbox) {
  ssize_t ret;
  size_t bytesReadNow = 0;

  // Start read transaction. This end of the connection is the only consumer for
  // this ringbuffer, and all reads are done from the reactor thread, so there
  // cannot be another transaction already going on. Fail hard in case.
  ret = inbox.startTx();
  TP_THROW_SYSTEM_IF(ret < 0, -ret);

  if (mode_ == READ_LENGTH) {
    uint32_t length;
    ret = inbox.template readInTx</*AllowPartial=*/false>(
        &length, sizeof(length));
    if (likely(ret >= 0)) {
      mode_ = READ_PAYLOAD;
      bytesReadNow += ret;
      if (nopObject_ != nullptr) {
        len_ = length;
      } else if (ptrProvided_) {
        TP_DCHECK_EQ(length, len_);
      } else {
        len_ = length;
        buf_ = std::make_unique<uint8_t[]>(len_);
        ptr_ = buf_.get();
      }
    } else if (unlikely(ret != -ENODATA)) {
      TP_THROW_SYSTEM(-ret);
    }
  }

  if (mode_ == READ_PAYLOAD) {
    if (nopObject_ != nullptr) {
      ret = readNopObject(inbox);
    } else {
      ret = inbox.template readInTx</*AllowPartial=*/true>(
          reinterpret_cast<uint8_t*>(ptr_) + bytesRead_, len_ - bytesRead_);
    }
    if (likely(ret >= 0)) {
      bytesRead_ += ret;
      bytesReadNow += ret;
    } else if (unlikely(ret != -ENODATA)) {
      TP_THROW_SYSTEM(-ret);
    }
  }

  ret = inbox.commitTx();
  TP_THROW_SYSTEM_IF(ret < 0, -ret);

  if (completed()) {
    fn_(Error::kSuccess, ptr_, len_);
  }

  return bytesReadNow;
}