bool RdmaMgr::ConnectivityCheck()

in tensorflow_networking/verbs/rdma_mgr.cc [134:180]


bool RdmaMgr::ConnectivityCheck() {
  int i, rcnt = 0, scnt = 0;

  for (const auto& p : channel_table_) {
    string worker_name = p.first;
    RdmaChannel* rc = p.second;

    VLOG(2) << "Ping to " << worker_name;
    CHECK(rc->PingPostSend() == 0) << "Couldn't post send  to " << worker_name
                                   << " with error: " << std::strerror(errno);
    for (i = 0; i < rc->adapter_->params_.queue_depth - 1; i++) {
      rc->Recv();
    }
  }

  while (rcnt < num_remote_workers_ || scnt < num_remote_workers_) {
    int ne;
    do {
      ne = ibv_poll_cq(rdma_adapter_->cq_, 2 * num_remote_workers_,
                       rdma_adapter_->wc_);
      CHECK(ne >= 0) << "poll CQ failed " << ne << "with error"
                     << std::strerror(errno);
    } while (ne < 1);

    for (i = 0; i < ne; ++i) {
      ibv_wc_status s = rdma_adapter_->wc_[i].status;
      // recv complete
      if ((int)rdma_adapter_->wc_[i].wr_id == RdmaChannel::kPingRecvWrid) {
        CHECK(s == IBV_WC_SUCCESS)
            << ": " << ibv_wc_status_str(rdma_adapter_->wc_[i].status) << "("
            << rdma_adapter_->wc_[i].status << ") for PING_RECV_WRID";
        ++rcnt;
        // send complete
      } else {
        RdmaChannel* rc =
            reinterpret_cast<RdmaChannel*>(rdma_adapter_->wc_[i].wr_id);
        CHECK(s == IBV_WC_SUCCESS)
            << ": " << ibv_wc_status_str(rdma_adapter_->wc_[i].status) << "("
            << rdma_adapter_->wc_[i].status << ") to " << rc->remote_name_;
        ++scnt;
      }
    }  // for
  }    // while
  CHECK(rcnt == scnt) << "Connectivity check failed!";
  rdma_adapter_->StartPolling();
  return (num_remote_workers_ == rcnt) && (num_remote_workers_ == scnt);
}