in tensorpipe/transport/ibv/connection_impl.cc [71:159]
void ConnectionImpl::initImplFromLoop() {
context_->enroll(*this);
Error error;
// The connection either got a socket or an address, but not both.
TP_DCHECK(socket_.hasValue() ^ sockaddr_.has_value());
if (!socket_.hasValue()) {
std::tie(error, socket_) =
Socket::createForFamily(sockaddr_->addr()->sa_family);
if (error) {
setError(std::move(error));
return;
}
error = socket_.reuseAddr(true);
if (error) {
setError(std::move(error));
return;
}
error = socket_.connect(sockaddr_.value());
if (error) {
setError(std::move(error));
return;
}
}
// Ensure underlying control socket is non-blocking such that it
// works well with event driven I/O.
error = socket_.block(false);
if (error) {
setError(std::move(error));
return;
}
// Create ringbuffer for inbox.
std::tie(error, inboxBuf_) = MmappedPtr::create(
kBufferSize, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1);
TP_THROW_ASSERT_IF(error)
<< "Couldn't allocate ringbuffer for connection inbox: " << error.what();
inboxRb_ =
RingBuffer<kNumInboxRingbufferRoles>(&inboxHeader_, inboxBuf_.ptr());
inboxMr_ = createIbvMemoryRegion(
context_->getReactor().getIbvLib(),
context_->getReactor().getIbvPd(),
inboxBuf_.ptr(),
kBufferSize,
IbvLib::ACCESS_LOCAL_WRITE | IbvLib::ACCESS_REMOTE_WRITE);
// Create ringbuffer for outbox.
std::tie(error, outboxBuf_) = MmappedPtr::create(
kBufferSize, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1);
TP_THROW_ASSERT_IF(error)
<< "Couldn't allocate ringbuffer for connection outbox: " << error.what();
outboxRb_ =
RingBuffer<kNumOutboxRingbufferRoles>(&outboxHeader_, outboxBuf_.ptr());
outboxMr_ = createIbvMemoryRegion(
context_->getReactor().getIbvLib(),
context_->getReactor().getIbvPd(),
outboxBuf_.ptr(),
kBufferSize,
0);
// Create and init queue pair.
{
IbvLib::qp_init_attr initAttr;
std::memset(&initAttr, 0, sizeof(initAttr));
initAttr.qp_type = IbvLib::QPT_RC;
initAttr.send_cq = context_->getReactor().getIbvCq().get();
initAttr.recv_cq = context_->getReactor().getIbvCq().get();
initAttr.cap.max_send_wr = kSendQueueSize;
initAttr.cap.max_send_sge = 1;
initAttr.srq = context_->getReactor().getIbvSrq().get();
initAttr.sq_sig_all = 1;
qp_ = createIbvQueuePair(
context_->getReactor().getIbvLib(),
context_->getReactor().getIbvPd(),
initAttr);
}
transitionIbvQueuePairToInit(
context_->getReactor().getIbvLib(),
qp_,
context_->getReactor().getIbvAddress());
// Register methods to be called when our peer writes to our inbox and reads
// from our outbox.
context_->getReactor().registerQp(qp_->qp_num, shared_from_this());
// We're sending address first, so wait for writability.
state_ = SEND_ADDR;
context_->registerDescriptor(socket_.fd(), EPOLLOUT, shared_from_this());
}