void Pair::onRead()

in gloo/transport/uv/pair.cc [203:262]


void Pair::onRead(const libuv::ReadEvent& event, const libuv::TCP&) {
  std::lock_guard<std::mutex> lock(mutex_);
  auto& op = readOp_;

  // If this is the first read for the current operation,
  // assert that we read the entire preamble.
  if (op.nread == 0) {
    GLOO_ENFORCE(event.length == sizeof(op.preamble));
  }

  // Capture total number of bytes read for the current operation.
  op.nread += event.length;

  const auto opcode = op.getOpcode();
  if (opcode == Op::SEND_UNBOUND_BUFFER) {
    // Remote side is sending data; find pending recv operation
    // and read into the associated unbound buffer.
    if (!op.buf) {
      auto it = localPendingRecv_.find(op.preamble.tag);
      GLOO_ENFORCE(it != localPendingRecv_.end());

      // Ensure queue of pending recv operations is not empty.
      auto& queue = it->second;
      GLOO_ENFORCE(!queue.empty());

      // Move pending recv operation to stack.
      auto pendingRecv = std::move(queue.front());
      queue.pop_front();
      if (queue.empty()) {
        localPendingRecv_.erase(it);
      }

      // Lock pointer to unbound buffer.
      op.buf = NonOwningPtr<UnboundBuffer>(pendingRecv.buf);
      GLOO_ENFORCE(op.buf, "Cannot lock pointer to unbound buffer");
      op.offset = pendingRecv.offset;
      op.length = pendingRecv.length;

      // Read into unbound buffer, if the read is non-empty.
      if (op.length) {
        handle_->read((char*)op.buf->ptr + op.offset, op.length);
        return;
      }
    }

    GLOO_ENFORCE(op.nread == op.preamble.nbytes);
    onSendUnboundBuffer(op);
  } else if (opcode == Op::NOTIFY_SEND_READY) {
    GLOO_ENFORCE(op.nread == op.preamble.nbytes);
    onNotifySendReady(op);
  } else if (opcode == Op::NOTIFY_RECV_READY) {
    GLOO_ENFORCE(op.nread == op.preamble.nbytes);
    onNotifyRecvReady(op);
  } else {
    FAIL("Unexpected opcode: ", opcode);
  }

  // Reset read operation and issue read for the next preamble.
  readNextOp();
}