in gloo/rendezvous/context.cc [181:243]
std::shared_ptr<::gloo::Context> ContextFactory::makeContext(
std::shared_ptr<transport::Device>& dev) {
auto context = std::make_shared<Context>(
backingContext_->rank,
backingContext_->size);
context->setTimeout(backingContext_->getTimeout());
// Assume it's the same for all pairs on a device
size_t addressSize = 0;
// Create pairs
auto transportContext = dev->createContext(context->rank, context->size);
transportContext->setTimeout(context->getTimeout());
for (auto i = 0; i < context->size; i++) {
if (i == context->rank) {
continue;
}
auto& pair = transportContext->createPair(i);
auto address = pair->address().bytes();
addressSize = address.size();
// Send address of new pair to peer
GLOO_ENFORCE_LE(addressSize, sendData_[i].size());
sendData_[i].assign(address.begin(), address.end());
sendBuffers_[i]->send(0, addressSize);
}
// Wait for remote addresses and connect peers
for (auto i = 0; i < context->size; i++) {
if (i == context->rank) {
continue;
}
recvBuffers_[i]->waitRecv();
auto& data = recvData_[i];
auto address = std::vector<char>(data.begin(), data.begin() + addressSize);
transportContext->getPair(i)->connect(address);
// Notify peer that we've consumed the payload
sendNotificationBuffers_[i]->send();
}
// Wait for incoming notification from peers
for (auto i = 0; i < context->size; i++) {
if (i == context->rank) {
continue;
}
recvNotificationBuffers_[i]->waitRecv();
}
// Wait for outgoing notifications to be flushed
for (auto i = 0; i < context->size; i++) {
if (i == context->rank) {
continue;
}
sendNotificationBuffers_[i]->waitSend();
}
context->device_ = dev;
context->transportContext_ = std::move(transportContext);
return std::static_pointer_cast<::gloo::Context>(context);
}