in gloo/transport/uv/pair.cc [203:262]
void Pair::onRead(const libuv::ReadEvent& event, const libuv::TCP&) {
std::lock_guard<std::mutex> lock(mutex_);
auto& op = readOp_;
// If this is the first read for the current operation,
// assert that we read the entire preamble.
if (op.nread == 0) {
GLOO_ENFORCE(event.length == sizeof(op.preamble));
}
// Capture total number of bytes read for the current operation.
op.nread += event.length;
const auto opcode = op.getOpcode();
if (opcode == Op::SEND_UNBOUND_BUFFER) {
// Remote side is sending data; find pending recv operation
// and read into the associated unbound buffer.
if (!op.buf) {
auto it = localPendingRecv_.find(op.preamble.tag);
GLOO_ENFORCE(it != localPendingRecv_.end());
// Ensure queue of pending recv operations is not empty.
auto& queue = it->second;
GLOO_ENFORCE(!queue.empty());
// Move pending recv operation to stack.
auto pendingRecv = std::move(queue.front());
queue.pop_front();
if (queue.empty()) {
localPendingRecv_.erase(it);
}
// Lock pointer to unbound buffer.
op.buf = NonOwningPtr<UnboundBuffer>(pendingRecv.buf);
GLOO_ENFORCE(op.buf, "Cannot lock pointer to unbound buffer");
op.offset = pendingRecv.offset;
op.length = pendingRecv.length;
// Read into unbound buffer, if the read is non-empty.
if (op.length) {
handle_->read((char*)op.buf->ptr + op.offset, op.length);
return;
}
}
GLOO_ENFORCE(op.nread == op.preamble.nbytes);
onSendUnboundBuffer(op);
} else if (opcode == Op::NOTIFY_SEND_READY) {
GLOO_ENFORCE(op.nread == op.preamble.nbytes);
onNotifySendReady(op);
} else if (opcode == Op::NOTIFY_RECV_READY) {
GLOO_ENFORCE(op.nread == op.preamble.nbytes);
onNotifyRecvReady(op);
} else {
FAIL("Unexpected opcode: ", opcode);
}
// Reset read operation and issue read for the next preamble.
readNextOp();
}