in tensorflow_networking/verbs/rdma_mgr.cc [134:180]
bool RdmaMgr::ConnectivityCheck() {
int i, rcnt = 0, scnt = 0;
for (const auto& p : channel_table_) {
string worker_name = p.first;
RdmaChannel* rc = p.second;
VLOG(2) << "Ping to " << worker_name;
CHECK(rc->PingPostSend() == 0) << "Couldn't post send to " << worker_name
<< " with error: " << std::strerror(errno);
for (i = 0; i < rc->adapter_->params_.queue_depth - 1; i++) {
rc->Recv();
}
}
while (rcnt < num_remote_workers_ || scnt < num_remote_workers_) {
int ne;
do {
ne = ibv_poll_cq(rdma_adapter_->cq_, 2 * num_remote_workers_,
rdma_adapter_->wc_);
CHECK(ne >= 0) << "poll CQ failed " << ne << "with error"
<< std::strerror(errno);
} while (ne < 1);
for (i = 0; i < ne; ++i) {
ibv_wc_status s = rdma_adapter_->wc_[i].status;
// recv complete
if ((int)rdma_adapter_->wc_[i].wr_id == RdmaChannel::kPingRecvWrid) {
CHECK(s == IBV_WC_SUCCESS)
<< ": " << ibv_wc_status_str(rdma_adapter_->wc_[i].status) << "("
<< rdma_adapter_->wc_[i].status << ") for PING_RECV_WRID";
++rcnt;
// send complete
} else {
RdmaChannel* rc =
reinterpret_cast<RdmaChannel*>(rdma_adapter_->wc_[i].wr_id);
CHECK(s == IBV_WC_SUCCESS)
<< ": " << ibv_wc_status_str(rdma_adapter_->wc_[i].status) << "("
<< rdma_adapter_->wc_[i].status << ") to " << rc->remote_name_;
++scnt;
}
} // for
} // while
CHECK(rcnt == scnt) << "Connectivity check failed!";
rdma_adapter_->StartPolling();
return (num_remote_workers_ == rcnt) && (num_remote_workers_ == scnt);
}