in gloo/transport/ibverbs/buffer.cc [116:168]
void Buffer::waitSend() {
// If the pair is in synchronous mode, the current thread is
// responsible for polling for work completions.
auto timeout = pair_->getTimeout();
if (pair_->sync_) {
// We can assume a single pair is never used by more than one
// thread, so there is no need to acquire the mutex here.
if (sendCompletions_ == 0) {
GLOO_ENFORCE_GT(sendPending_, 0, "No send to wait for");
auto start = std::chrono::steady_clock::now();
// We can assume a single pair is never used by more than one
// thread, so there is no need to acquire the mutex here.
while (sendCompletions_ == 0) {
pair_->pollCompletions();
if (timeout != kNoTimeout &&
(std::chrono::steady_clock::now() - start) >= timeout) {
pair_->signalIoFailure(
GLOO_ERROR_MSG("Send timeout ", pair_->peer().str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
}
sendCompletions_--;
} else {
// The device thread will signal completion. If the completion
// hasn't arrived yet, wait until it does.
std::unique_lock<std::mutex> lock(m_);
checkErrorState();
if (sendCompletions_ == 0) {
GLOO_ENFORCE_GT(sendPending_, 0, "No send to wait for");
auto pred = [&]{
checkErrorState();
return sendCompletions_ > 0;
};
if (timeout == kNoTimeout) {
// No timeout set. Wait for read to complete.
sendCv_.wait(lock, pred);
} else {
auto done = sendCv_.wait_for(lock, timeout, pred);
if (!done) {
// Release the mutex before calling into the pair to avoid deadlock.
// Calling signalIoFailure() will throw, so no need to
// reacquire.
lock.unlock();
pair_->signalIoFailure(
GLOO_ERROR_MSG("Send timeout ", pair_->peer().str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
}
sendCompletions_--;
}
}