gloo/cuda_allreduce_bcube.cc (409 lines of code) (raw):
/**
* Copyright (c) 2018-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "gloo/cuda_allreduce_bcube.h"
#include "gloo/cuda_collectives_device.h"
#include "gloo/cuda_collectives_host.h"
#include "gloo/cuda_private.h"
#include <sstream>
#ifndef _WIN32
#include <unistd.h>
#endif
#ifdef DEBUG
#define DEBUG_PRINT_STAGE(stage) \
do { \
printStageBuffer(stage); \
} while (false)
#define DEBUG_PRINT_SEND(stage) \
do { \
printStepBuffer( \
stage, step, myRank_, destRank, &scratch_[0], sendCount, ptrOffset); \
} while (false)
#define DEBUG_PRINT_RECV(stage) \
do { \
printStepBuffer( \
stage, \
step, \
srcRank, \
myRank_, \
&recvBufs_[recvBufIdx_[srcRank]][0], \
recvCount); \
} while (false)
#else
#define DEBUG_PRINT_STAGE(stage)
#define DEBUG_PRINT_SEND(stage)
#define DEBUG_PRINT_RECV(stage)
#endif
namespace gloo {
template <typename T, typename W>
CudaAllreduceBcube<T, W>::CudaAllreduceBcube(
const std::shared_ptr<Context>& context,
const std::vector<T*>& ptrs,
const int count,
const std::vector<cudaStream_t>& streams,
const CudaReductionFunction<T>* fn)
: Algorithm(context),
myRank_(this->context_->rank),
base_(this->context_->base ? this->context_->base : 2),
nodes_(this->contextSize_),
totalNumElems_(count),
bytes_(totalNumElems_ * sizeof(T)),
steps_(computeSteps(nodes_, base_)),
fn_(fn),
recvBufs_(steps_ * base_) {
auto newStream = true;
if (streams.size() > 0) {
GLOO_ENFORCE_EQ(streams.size(), ptrs.size());
newStream = false;
}
for (auto i = 0; i < ptrs.size(); i++) {
auto ptr = CudaDevicePointer<T>::create(ptrs[i], totalNumElems_);
if (newStream) {
streams_.push_back(CudaStream(ptr.getDeviceID()));
} else {
streams_.push_back(CudaStream(ptr.getDeviceID(), streams[i]));
}
devicePtrs_.push_back(std::move(ptr));
}
// Workspace specific initialization (see below)
init();
if (nodes_ == 1) {
return;
}
setupNodes();
/*
* Reserve max needed number of context slots. Up to 2 slots per process
* pair are needed (one for regular sends and one for notifications). For
* simplicity, the same mapping is used on all processes so that the slots
* trivially match across processes
*/
int slotOffset_ = this->context_->nextSlot(
2 * this->contextSize_ * (this->contextSize_ - 1));
int bufIdx = 0;
for (int step = 0; step < steps_; ++step) {
for (int destRank : getPeersPerStep(myRank_, step)) {
int recvSize = std::max(
getNumElemsPerStep(myRank_, step),
getNumElemsPerStep(destRank, step));
auto& pair = this->context_->getPair(destRank);
auto slot = slotOffset_ +
2 * (std::min(myRank_, destRank) * nodes_ +
std::max(myRank_, destRank));
sendDataBufs_[destRank] = pair->createSendBuffer(slot, *scratch_, bytes_);
recvBufs_[bufIdx] = W::Pointer::alloc(recvSize);
recvDataBufs_[destRank] = pair->createRecvBuffer(
slot, &recvBufs_[bufIdx][0], recvSize * sizeof(T));
recvBufIdx_[destRank] = bufIdx;
++bufIdx;
++slot;
sendNotificationBufs_[destRank] =
pair->createSendBuffer(slot, &dummy_, sizeof(dummy_));
recvNotificationBufs_[destRank] =
pair->createRecvBuffer(slot, &dummy_, sizeof(dummy_));
} // nodes
} // steps
}
template <typename T, typename W>
void CudaAllreduceBcube<T, W>::run() {
CudaDeviceGuard guard;
CudaStream& stream = *scratchStream_;
localReduceOp_->run();
if (nodes_ == 1) {
GLOO_ENFORCE(
localBroadcastOp_,
"localBroadcastOp must be initialized for single machine");
localBroadcastOp_->run();
return;
}
// Reduce-scatter
DEBUG_PRINT_STAGE("start");
for (int step = 0; step < steps_; ++step) {
const auto& peerRanks = getPeersPerStep(myRank_, step);
for (int destRank : peerRanks) {
int sendCount = getNumElemsPerStep(destRank, step);
int ptrOffset = getPtrOffsetPerStep(destRank, step);
DEBUG_PRINT_SEND("reduce-scatter");
sendDataBufs_[destRank]->send(
ptrOffset * sizeof(T), sendCount * sizeof(T));
} // sends within group
for (int srcRank : peerRanks) {
int recvCount = getNumElemsPerStep(myRank_, step);
int ptrOffset = getPtrOffsetPerStep(myRank_, step);
recvDataBufs_[srcRank]->waitRecv();
DEBUG_PRINT_RECV("reduce-scatter");
auto recvBufAtOffset =
recvBufs_[recvBufIdx_[srcRank]].range(0, recvCount);
auto scratchAtOffset = scratch_.range(ptrOffset, recvCount);
fn_->call(scratchAtOffset, recvBufAtOffset, recvCount, stream);
stream.wait();
/*
* Send notification to the pair we just received from that
* we're done dealing with the receive buffer.
*/
sendNotificationBufs_[srcRank]->send();
} // recvs within group and reduces
} // reduce-scatter steps
DEBUG_PRINT_STAGE("reduce-scattered");
// All-gather
for (int step = steps_ - 1; step >= 0; --step) {
const auto& peerRanks = getPeersPerStep(myRank_, step);
for (int destRank : peerRanks) {
int sendCount = getNumElemsPerStep(myRank_, step);
int ptrOffset = getPtrOffsetPerStep(myRank_, step);
/*
* Wait for notification from the peer to make sure we can send data
* without risking any overwrites in its receive buffer.
*/
recvNotificationBufs_[destRank]->waitRecv();
DEBUG_PRINT_SEND("all-gather");
sendDataBufs_[destRank]->send(
ptrOffset * sizeof(T), sendCount * sizeof(T));
}
for (int srcRank : peerRanks) {
int recvCount = getNumElemsPerStep(srcRank, step);
int ptrOffset = getPtrOffsetPerStep(srcRank, step);
recvDataBufs_[srcRank]->waitRecv();
DEBUG_PRINT_RECV("all-gather");
auto recvBufAtOffset =
recvBufs_[recvBufIdx_[srcRank]].range(0, recvCount);
auto scratchAtOffset = scratch_.range(ptrOffset, recvCount);
stream.copyAsync(scratchAtOffset, recvBufAtOffset);
stream.wait();
if (step == 0) {
/*
* Send notification to the pair we just received from that
* we're done dealing with the receive buffer.``
*/
sendNotificationBufs_[srcRank]->send();
}
} // recvs within group and reduces
} // all-gather steps
DEBUG_PRINT_STAGE("all-reduced");
localBroadcastOp_->runAsync();
localBroadcastOp_->wait();
/*
* Wait for notifications from our peers within the block to make
* sure we can send data immediately without risking overwriting
* data in its receive buffer before it consumed that data.
*/
for (int peerRank : getPeersPerStep(myRank_, 0)) {
recvNotificationBufs_[peerRank]->waitRecv();
}
}
template <typename T, typename W>
int CudaAllreduceBcube<T, W>::computeSteps(int nodes, int peers) {
float lg2n = log2(nodes);
float lg2p = log2(peers);
return ceil(lg2n / lg2p);
}
template <typename T, typename W>
bool CudaAllreduceBcube<T, W>::printCheck(int /* rank */) {
return false;
}
template <typename T, typename W>
void CudaAllreduceBcube<T, W>::printBreak(T* p, int x) {
if (0 == x % wordsPerLine) {
std::cout << std::endl
<< &p[x] << " " << std::setfill('0') << std::setw(5) << x << ": ";
} else if (0 == x % wordsPerSection) {
std::cout << "- ";
}
}
template <typename T, typename W>
void CudaAllreduceBcube<T, W>::printElems(T* p, int count, int start) {
auto alignedStart = (start / wordsPerLine) * wordsPerLine;
for (int x = alignedStart; x < start + count; ++x) {
printBreak(p, x);
if (x < start) {
std::cout << "..... ";
} else {
std::cout << std::setfill('0') << std::setw(5) << p[x] << " ";
}
}
}
template <typename T, typename W>
void CudaAllreduceBcube<T, W>::printStageBuffer(const std::string& msg) {
if (printCheck(myRank_)) {
std::cout << "rank (" << myRank_ << ") " << msg << ": ";
printElems(&scratch_[0], totalNumElems_);
std::cout << std::endl;
}
}
template <typename T, typename W>
void CudaAllreduceBcube<T, W>::printStepBuffer(
const std::string& stage,
int step,
int srcRank,
int destRank,
T* p,
int count,
int start) {
if (printCheck(myRank_)) {
std::cout << stage << ": step (" << step << ") "
<< "srcRank (" << srcRank << ") -> "
<< "destRank (" << destRank << "): ";
printElems(p, count, start);
std::cout << std::endl;
}
}
template <typename T, typename W>
const std::vector<int>& CudaAllreduceBcube<T, W>::getPeersPerStep(
int rank,
int step) {
return allNodes_[rank].getPeersPerStep(step);
}
template <typename T, typename W>
int CudaAllreduceBcube<T, W>::getNumElemsPerStep(int rank, int step) {
return allNodes_[rank].getNumElemsPerStep(step);
}
template <typename T, typename W>
int CudaAllreduceBcube<T, W>::getPtrOffsetPerStep(int rank, int step) {
return allNodes_[rank].getPtrOffsetPerStep(step);
}
template <typename T, typename W>
void CudaAllreduceBcube<T, W>::createNodes() {
for (int rank = 0; rank < nodes_; ++rank) {
allNodes_.emplace_back(rank, steps_);
}
}
template <typename T, typename W>
void CudaAllreduceBcube<T, W>::updateGroupNodes(
int step,
const cuda::bcube::Group& group) {
const std::vector<int>& peers = group.getNodeRanks();
const int peersSz = peers.size();
int ptrOffset = group.getPtrOffset();
int count = group.getNumElems() / peersSz;
const int countRem = group.getNumElems() % peersSz;
if (0 == count) {
count = 1;
}
for (int i = 0; i < peersSz; ++i) {
cuda::bcube::Node& node = allNodes_[peers[i]];
if (peersSz - 1 != i) { // if not the last node in group
node.setPerStepAttributes(step, peers, count, ptrOffset);
ptrOffset += count;
} else {
/*
* The last node get the remainder elements if the number of
* elements is not exactly divisible by number of peers
*/
node.setPerStepAttributes(step, peers, count + countRem, ptrOffset);
ptrOffset += count + countRem;
}
ptrOffset %= totalNumElems_;
}
}
template <typename T, typename W>
void CudaAllreduceBcube<T, W>::setupNodes() {
// Create all the nodes upfront
createNodes();
// Now we actually try to set up the nodes
int peerDistance = 1;
for (int step = 0; step < steps_; ++step) {
std::vector<cuda::bcube::Group> groups;
// Iterate over all the nodes to identify the first node of each group
for (int rank = 0; rank < nodes_; ++rank) {
const cuda::bcube::Node& firstNode = allNodes_[rank];
// Only the ones with no peers would be first node
if (0 == firstNode.getPeersPerStep(step).size()) {
// Create a new group
groups.emplace_back(
step, firstNode, peerDistance, base_, nodes_, totalNumElems_);
// check the size to keep link happy :/
if (0 < groups.size()) {
// Iterrate over all the peer nodes and set them up for the step
updateGroupNodes(step, groups.back());
}
} // if (0 == firstNode ...
} // for (int rank = 0..
// Done iterating over all the nodes. Update peerDistance for next step.
peerDistance *= base_;
} // for (int step ...
} // setupNodes
template <typename T, typename W>
template <typename U>
void CudaAllreduceBcube<T, W>::init(
typename std::enable_if<
std::is_same<U, CudaHostWorkspace<T>>::value,
typename U::Pointer>::type*) {
// Since reduction is executed on the CPU, the scratch space
// where they are accumulated is a new host side buffer.
scratch_ = W::Pointer::alloc(totalNumElems_);
scratchStream_ = &streams_[0];
// Set up local reduction and broadcast operations on the host.
// If devicePtrs_.size() == 1 these functions construct an op that
// executes a memcpy such that scratch_ always holds the result.
if (bytes_ < kOnDeviceThreshold) {
localReduceOp_ =
cudaHostReduce(streams_, devicePtrs_, scratch_, fn_, 0, totalNumElems_);
localBroadcastOp_ =
cudaHostBroadcast(streams_, devicePtrs_, scratch_, 0, totalNumElems_);
} else {
localReduceOp_ = cudaDeviceReduce(
streams_, devicePtrs_, scratch_, fn_, 0, totalNumElems_);
localBroadcastOp_ =
cudaDeviceBroadcast(streams_, devicePtrs_, scratch_, 0, totalNumElems_);
}
}
template <typename T, typename W>
template <typename U>
void CudaAllreduceBcube<T, W>::init(
typename std::enable_if<
std::is_same<U, CudaDeviceWorkspace<T>>::value,
typename U::Pointer>::type*) {
// The networking adapter does DMA to/from GPU memory, so we should reduce
// onto the device that's closest to the networking adapter bound
// to our context. This uses PCI distance to find closest GPU.
auto index = findCudaDevicePointerClosestToDevice(
devicePtrs_, this->context_->getDevice());
scratch_ = CudaDevicePointer<T>::create(devicePtrs_[index]);
scratchStream_ = &streams_[index];
// Set up local reduction and broadcast operations on the device.
// When running with a device workspace we intend to never leave the device.
if (devicePtrs_.size() > 1) {
localReduceOp_ = cudaDeviceReduce(
streams_, devicePtrs_, scratch_, fn_, 0, totalNumElems_);
localBroadcastOp_ =
cudaDeviceBroadcast(streams_, devicePtrs_, scratch_, 0, totalNumElems_);
}
}
namespace cuda {
namespace bcube {
Node::Node(int rank, int steps) : rank_(rank) {
for (int i = 0; i < steps; ++i) {
peersPerStep_.emplace_back();
}
numElemsPerStep_.reserve(steps);
ptrOffsetPerStep_.reserve(steps);
}
int Node::getRank() const {
return rank_;
}
void Node::setPerStepAttributes(
int step,
const std::vector<int>& peerRanks,
int numElems,
int offset) {
for (int peerRank : peerRanks) {
if (peerRank != rank_) {
peersPerStep_[step].emplace_back(peerRank);
}
}
numElemsPerStep_[step] = numElems;
ptrOffsetPerStep_[step] = offset;
}
const std::vector<int>& Node::getPeersPerStep(int step) const {
return peersPerStep_[step];
}
int Node::getNumElemsPerStep(int step) const {
return numElemsPerStep_[step];
}
int Node::getPtrOffsetPerStep(int step) const {
return ptrOffsetPerStep_[step];
}
Group::Group(
int step,
const Node& firstNode,
int peerDistance,
int base,
int nodes,
int totalNumElems)
: nodeRanks_(getNodeRanks(firstNode.getRank(), peerDistance, base, nodes)),
ptrOffset_((0 == step) ? 0 : firstNode.getPtrOffsetPerStep(step - 1)),
numElems_(
computeNumElems(step, firstNode, nodeRanks_.size(), totalNumElems)) {}
const std::vector<int>& Group::getNodeRanks() const {
return nodeRanks_;
}
int Group::getPtrOffset() const {
return ptrOffset_;
}
int Group::getNumElems() const {
return numElems_;
}
int Group::computeNumElems(
int step,
const Node& firstNode,
int peers,
int count) {
int groupCount = (0 == step) ? count : firstNode.getNumElemsPerStep(step - 1);
return std::max(groupCount, peers);
}
std::vector<int> Group::getNodeRanks(
int firstNodeRank,
int peerDistance,
int base,
int nodes) const {
std::vector<int> groupPeers;
for (int i = 0; i < base; ++i) {
int peerRank = firstNodeRank + i * peerDistance;
if (peerRank < nodes) {
groupPeers.emplace_back(peerRank);
}
}
return groupPeers;
}
} // namespace bcube
} // namespace cuda
#define INSTANTIATE_TEMPLATE(T) \
template class CudaAllreduceBcube<T, CudaHostWorkspace<T>>; \
template class CudaAllreduceBcube<T, CudaDeviceWorkspace<T>>;
INSTANTIATE_TEMPLATE(int8_t);
INSTANTIATE_TEMPLATE(uint8_t);
INSTANTIATE_TEMPLATE(int32_t);
INSTANTIATE_TEMPLATE(int64_t);
INSTANTIATE_TEMPLATE(uint64_t);
INSTANTIATE_TEMPLATE(float);
INSTANTIATE_TEMPLATE(double);
INSTANTIATE_TEMPLATE(float16);
} // namespace gloo