in gloo/transport/tcp/pair.cc [449:515]
ssize_t Pair::prepareRead(
Op& op,
NonOwningPtr<UnboundBuffer>& buf,
struct iovec& iov) {
iov.iov_base = nullptr;
iov.iov_len = 0;
// Read preamble
if (op.nread < sizeof(op.preamble)) {
iov.iov_base = ((char*)&op.preamble) + op.nread;
iov.iov_len = sizeof(op.preamble) - op.nread;
return iov.iov_len;
}
auto opcode = op.getOpcode();
auto offset = op.nread - sizeof(op.preamble);
// Remote side is sending data to a buffer; read payload
if (opcode == Op::SEND_BUFFER) {
if (op.buf == nullptr) {
op.buf = getBuffer(op.preamble.slot);
// Buffer not (yet) registered, leave it for next loop iteration
if (op.buf == nullptr) {
return -1;
}
}
iov.iov_base = ((char*)op.buf->ptr_) + offset + op.preamble.roffset;
iov.iov_len = op.preamble.length - offset;
// Bytes read must be in bounds for target buffer
GLOO_ENFORCE_LE(op.preamble.roffset + op.preamble.length, op.buf->size_);
return iov.iov_len;
}
// Remote side is sending data to an unbound buffer; read payload
if (opcode == Op::SEND_UNBOUND_BUFFER) {
if (!op.ubuf) {
auto it = localPendingRecv_.find(op.preamble.slot);
GLOO_ENFORCE(it != localPendingRecv_.end());
std::deque<UnboundBufferOp>& queue = it->second;
GLOO_ENFORCE(!queue.empty());
std::tie(op.ubuf, op.offset, op.nbytes) = queue.front();
queue.pop_front();
if (queue.empty()) {
localPendingRecv_.erase(it);
}
}
// Acquire short lived pointer to unbound buffer.
// This is a stack allocated variable in the read function
// which is destructed upon that function returning.
buf = NonOwningPtr<UnboundBuffer>(op.ubuf);
if (!buf) {
return -1;
}
iov.iov_base = ((char*)buf->ptr) + op.offset + offset;
iov.iov_len = op.preamble.length - offset;
// Bytes read must be in bounds for target buffer
GLOO_ENFORCE_LE(op.preamble.length, op.nbytes);
return iov.iov_len;
}
return 0;
}