bool Reactor::pollOnce()

in tensorpipe/transport/ibv/reactor.cc [80:155]


bool Reactor::pollOnce() {
  std::array<IbvLib::wc, kNumPolledWorkCompletions> wcs;
  auto rv = getIbvLib().poll_cq(cq_.get(), wcs.size(), wcs.data());

  if (rv == 0) {
    return false;
  }
  TP_THROW_SYSTEM_IF(rv < 0, errno);

  int numRecvs = 0;
  int numWrites = 0;
  int numAcks = 0;
  for (int wcIdx = 0; wcIdx < rv; wcIdx++) {
    IbvLib::wc& wc = wcs[wcIdx];

    TP_VLOG(9) << "Transport context " << id_
               << " got work completion for request " << wc.wr_id << " for QP "
               << wc.qp_num << " with status "
               << getIbvLib().wc_status_str(wc.status) << " and opcode "
               << ibvWorkCompletionOpcodeToStr(wc.opcode)
               << " (byte length: " << wc.byte_len
               << ", immediate data: " << wc.imm_data << ")";

    auto iter = queuePairEventHandler_.find(wc.qp_num);
    TP_THROW_ASSERT_IF(iter == queuePairEventHandler_.end())
        << "Got work completion for unknown queue pair " << wc.qp_num;

    if (wc.status != IbvLib::WC_SUCCESS) {
      iter->second->onError(wc.status, wc.wr_id);
      continue;
    }

    switch (wc.opcode) {
      case IbvLib::WC_RECV_RDMA_WITH_IMM:
        TP_THROW_ASSERT_IF(!(wc.wc_flags & IbvLib::WC_WITH_IMM));
        iter->second->onRemoteProducedData(wc.imm_data);
        numRecvs++;
        break;
      case IbvLib::WC_RECV:
        TP_THROW_ASSERT_IF(!(wc.wc_flags & IbvLib::WC_WITH_IMM));
        iter->second->onRemoteConsumedData(wc.imm_data);
        numRecvs++;
        break;
      case IbvLib::WC_RDMA_WRITE:
        iter->second->onWriteCompleted();
        numWrites++;
        break;
      case IbvLib::WC_SEND:
        iter->second->onAckCompleted();
        numAcks++;
        break;
      default:
        TP_THROW_ASSERT() << "Unknown opcode: " << wc.opcode;
    }
  }

  postRecvRequestsOnSRQ(numRecvs);

  numAvailableWrites_ += numWrites;
  while (!pendingQpWrites_.empty() && numAvailableWrites_ > 0) {
    postWrite(
        std::get<0>(pendingQpWrites_.front()),
        std::get<1>(pendingQpWrites_.front()));
    pendingQpWrites_.pop_front();
  }

  numAvailableAcks_ += numAcks;
  while (!pendingQpAcks_.empty() && numAvailableAcks_ > 0) {
    postAck(
        std::get<0>(pendingQpAcks_.front()),
        std::get<1>(pendingQpAcks_.front()));
    pendingQpAcks_.pop_front();
  }

  return true;
}