in tensorpipe/transport/ibv/reactor.cc [80:155]
bool Reactor::pollOnce() {
std::array<IbvLib::wc, kNumPolledWorkCompletions> wcs;
auto rv = getIbvLib().poll_cq(cq_.get(), wcs.size(), wcs.data());
if (rv == 0) {
return false;
}
TP_THROW_SYSTEM_IF(rv < 0, errno);
int numRecvs = 0;
int numWrites = 0;
int numAcks = 0;
for (int wcIdx = 0; wcIdx < rv; wcIdx++) {
IbvLib::wc& wc = wcs[wcIdx];
TP_VLOG(9) << "Transport context " << id_
<< " got work completion for request " << wc.wr_id << " for QP "
<< wc.qp_num << " with status "
<< getIbvLib().wc_status_str(wc.status) << " and opcode "
<< ibvWorkCompletionOpcodeToStr(wc.opcode)
<< " (byte length: " << wc.byte_len
<< ", immediate data: " << wc.imm_data << ")";
auto iter = queuePairEventHandler_.find(wc.qp_num);
TP_THROW_ASSERT_IF(iter == queuePairEventHandler_.end())
<< "Got work completion for unknown queue pair " << wc.qp_num;
if (wc.status != IbvLib::WC_SUCCESS) {
iter->second->onError(wc.status, wc.wr_id);
continue;
}
switch (wc.opcode) {
case IbvLib::WC_RECV_RDMA_WITH_IMM:
TP_THROW_ASSERT_IF(!(wc.wc_flags & IbvLib::WC_WITH_IMM));
iter->second->onRemoteProducedData(wc.imm_data);
numRecvs++;
break;
case IbvLib::WC_RECV:
TP_THROW_ASSERT_IF(!(wc.wc_flags & IbvLib::WC_WITH_IMM));
iter->second->onRemoteConsumedData(wc.imm_data);
numRecvs++;
break;
case IbvLib::WC_RDMA_WRITE:
iter->second->onWriteCompleted();
numWrites++;
break;
case IbvLib::WC_SEND:
iter->second->onAckCompleted();
numAcks++;
break;
default:
TP_THROW_ASSERT() << "Unknown opcode: " << wc.opcode;
}
}
postRecvRequestsOnSRQ(numRecvs);
numAvailableWrites_ += numWrites;
while (!pendingQpWrites_.empty() && numAvailableWrites_ > 0) {
postWrite(
std::get<0>(pendingQpWrites_.front()),
std::get<1>(pendingQpWrites_.front()));
pendingQpWrites_.pop_front();
}
numAvailableAcks_ += numAcks;
while (!pendingQpAcks_.empty() && numAvailableAcks_ > 0) {
postAck(
std::get<0>(pendingQpAcks_.front()),
std::get<1>(pendingQpAcks_.front()));
pendingQpAcks_.pop_front();
}
return true;
}