in tensorpipe/channel/cuda_gdr/channel_impl.cc [86:153]
void ChannelImpl::onReadHandshakeNumNics(
const HandshakeNumNics& nopHandshakeNumNics) {
TP_DCHECK(context_->inLoop());
TP_DCHECK_EQ(state_, WAITING_FOR_HANDSHAKE_NUM_NICS);
TP_DCHECK(!error_);
numRemoteNics_ = nopHandshakeNumNics.numNics;
std::vector<std::vector<NopIbvSetupInformation>> allSetupInfo;
queuePairs_.resize(numLocalNics_);
allSetupInfo.resize(numLocalNics_);
for (size_t localNicIdx = 0; localNicIdx < numLocalNics_; localNicIdx++) {
queuePairs_[localNicIdx].resize(numRemoteNics_);
allSetupInfo[localNicIdx].resize(numRemoteNics_);
IbvNic& localNic = context_->getIbvNic(localNicIdx);
for (size_t remoteNicIdx = 0; remoteNicIdx < numRemoteNics_;
remoteNicIdx++) {
IbvLib::qp_init_attr initAttr;
std::memset(&initAttr, 0, sizeof(initAttr));
initAttr.qp_type = IbvLib::QPT_RC;
initAttr.send_cq = localNic.getIbvCq().get();
initAttr.recv_cq = localNic.getIbvCq().get();
initAttr.cap.max_send_wr = kNumSends;
initAttr.cap.max_send_sge = 1;
initAttr.cap.max_recv_wr = kNumRecvs;
initAttr.cap.max_recv_sge = 1;
initAttr.sq_sig_all = 1;
IbvQueuePair qp = createIbvQueuePair(
context_->getIbvLib(), localNic.getIbvPd(), initAttr);
transitionIbvQueuePairToInit(
context_->getIbvLib(), qp, localNic.getIbvAddress());
IbvSetupInformation setupInfo =
makeIbvSetupInformation(localNic.getIbvAddress(), qp);
// The maximum message size will be filled in later.
queuePairs_[localNicIdx][remoteNicIdx] =
QueuePair{std::move(qp), /*maximumMessageSize=*/0};
allSetupInfo[localNicIdx][remoteNicIdx].fromIbvSetupInformation(
setupInfo);
}
}
auto nopHolderOut = std::make_shared<NopHolder<HandshakeSetupInfo>>();
HandshakeSetupInfo& nopHandshakeSetupInfo = nopHolderOut->getObject();
nopHandshakeSetupInfo.setupInfo = std::move(allSetupInfo);
TP_VLOG(6) << "Channel " << id_ << " is writing nop object (handshake two)";
readyToReceiveConnection_->write(
*nopHolderOut, callbackWrapper_([nopHolderOut](ChannelImpl& impl) {
TP_VLOG(6) << "Channel " << impl.id_
<< " done writing nop object (handshake two)";
}));
auto nopHolderIn = std::make_shared<NopHolder<HandshakeSetupInfo>>();
TP_VLOG(6) << "Channel " << id_ << " is reading nop object (handshake two)";
readyToReceiveConnection_->read(
*nopHolderIn, callbackWrapper_([nopHolderIn](ChannelImpl& impl) {
TP_VLOG(6) << "Channel " << impl.id_
<< " done reading nop object (handshake two)";
if (!impl.error_) {
impl.onReadHandshakeSetupInfo(nopHolderIn->getObject());
}
}));
state_ = WAITING_FOR_HANDSHAKE_SETUP_INFO;
}