in tensorpipe/channel/cuda_gdr/context_impl.cc [285:347]
bool IbvNic::pollOnce() {
std::array<IbvLib::wc, kNumPolledWorkCompletions> wcs;
auto rv = ibvLib_.poll_cq(cq_.get(), wcs.size(), wcs.data());
if (rv == 0) {
return false;
}
TP_THROW_SYSTEM_IF(rv < 0, errno);
int numSends = 0;
int numRecvs = 0;
for (int wcIdx = 0; wcIdx < rv; wcIdx++) {
IbvLib::wc& wc = wcs[wcIdx];
TP_VLOG(6) << "Channel context " << id_ << " got work completion on device "
<< name_ << " for request " << wc.wr_id << " for QP "
<< wc.qp_num << " with status "
<< ibvLib_.wc_status_str(wc.status) << " and opcode "
<< ibvWorkCompletionOpcodeToStr(wc.opcode)
<< " (byte length: " << wc.byte_len << ")";
auto iter = requestsInFlight_.find(wc.wr_id);
TP_THROW_ASSERT_IF(iter == requestsInFlight_.end())
<< "Got work completion with unknown ID " << wc.wr_id;
IbvLib::wc_opcode opcode = std::move(std::get<0>(iter->second));
std::function<void(const Error&)> cb = std::move(std::get<1>(iter->second));
requestsInFlight_.erase(iter);
if (wc.status != IbvLib::WC_SUCCESS) {
cb(TP_CREATE_ERROR(IbvError, ibvLib_.wc_status_str(wc.status)));
} else {
cb(Error::kSuccess);
}
switch (opcode) {
case IbvLib::WC_RECV:
numRecvs++;
break;
case IbvLib::WC_SEND:
numSends++;
break;
default:
TP_THROW_ASSERT() << "Unknown opcode: " << opcode;
}
}
numAvailableSendSlots_ += numSends;
while (!sendsWaitingForSlots_.empty() && numAvailableSendSlots_ > 0) {
applyFunc(
*this, &IbvNic::postSend, std::move(sendsWaitingForSlots_.front()));
sendsWaitingForSlots_.pop_front();
}
numAvailableRecvSlots_ += numRecvs;
while (!recvsWaitingForSlots_.empty() && numAvailableRecvSlots_ > 0) {
applyFunc(
*this, &IbvNic::postRecv, std::move(recvsWaitingForSlots_.front()));
recvsWaitingForSlots_.pop_front();
}
return true;
}