in gloo/allreduce.cc [429:670]
void bcube(
const detail::AllreduceOptionsImpl& opts,
ReduceRangeFunction reduceInputs,
BroadcastRangeFunction broadcastOutputs) {
const auto& context = opts.context;
const auto slot = Slot::build(kAllreduceSlotPrefix, opts.tag);
const auto elementSize = opts.elementSize;
auto& out = opts.out[0];
constexpr auto n = 2;
// Figure out the number of steps in this algorithm.
const auto groupSizePerStep = computeGroupSizePerStep(context->size, n);
struct group {
// Distance between peers in this group.
size_t peerDistance;
// Segment that this group is responsible for reducing.
size_t bufferOffset;
size_t bufferLength;
// The process ranks that are a member of this group.
std::vector<size_t> ranks;
// Upper bound of the length of the chunk that each process has the
// reduced values for by the end of the reduction for this group.
size_t chunkLength;
// Chunk within the segment that this process is responsible for reducing.
size_t myChunkOffset;
size_t myChunkLength;
};
// Compute the details of a group at every algorithm step.
// We keep this in a vector because we iterate through it in forward order in
// the reduce/scatter phase and in backward order in the allgather phase.
std::vector<struct group> groups;
{
struct group group;
group.peerDistance = 1;
group.bufferOffset = 0;
group.bufferLength = opts.elements;
for (const size_t groupSize : groupSizePerStep) {
const size_t groupRank = (context->rank / group.peerDistance) % groupSize;
const size_t baseRank = context->rank - (groupRank * group.peerDistance);
group.ranks.reserve(groupSize);
for (size_t i = 0; i < groupSize; i++) {
group.ranks.push_back(baseRank + i * group.peerDistance);
}
// Compute the length of the chunk we're exchanging at this step.
group.chunkLength = ((group.bufferLength + (groupSize - 1)) / groupSize);
// This process is computing the reduction of the chunk positioned at
// <rank>/<size> within the current segment.
group.myChunkOffset =
group.bufferOffset + (groupRank * group.chunkLength);
group.myChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) -
int64_t(groupRank * group.chunkLength))));
// Store a const copy of this group in the vector.
groups.push_back(group);
// Initialize with updated peer distance and segment offset and length.
struct group nextGroup;
nextGroup.peerDistance = group.peerDistance * groupSize;
nextGroup.bufferOffset = group.myChunkOffset;
nextGroup.bufferLength = group.myChunkLength;
std::swap(group, nextGroup);
}
}
// The chunk length is rounded up, so the maximum scratch space we need
// might be larger than the size of the output buffer. Compute the maximum
size_t bufferLength = opts.elements;
for (const auto& group : groups) {
bufferLength =
std::max(bufferLength, group.ranks.size() * group.chunkLength);
}
// Allocate scratch space to receive data from peers.
const size_t bufferSize = bufferLength * elementSize;
std::unique_ptr<uint8_t[]> buffer(new uint8_t[bufferSize]);
std::unique_ptr<transport::UnboundBuffer> tmp =
context->createUnboundBuffer(buffer.get(), bufferSize);
// Reduce/scatter.
for (size_t step = 0; step < groups.size(); step++) {
const auto& group = groups[step];
// Issue receive operations for chunks from peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
tmp->recv(
src,
slot,
i * group.chunkLength * elementSize,
group.myChunkLength * elementSize);
}
// Issue send operations for local chunks to peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto dst = group.ranks[i];
if (dst == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
// Compute the local reduction only in the first step of the algorithm.
// In subsequent steps, we already have a partially reduced result.
if (step == 0) {
reduceInputs(
currentChunkOffset * elementSize, currentChunkLength * elementSize);
}
out->send(
dst,
slot,
currentChunkOffset * elementSize,
currentChunkLength * elementSize);
}
// Wait for send and receive operations to complete.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
tmp->waitRecv();
out->waitSend();
}
// In the first step, prepare the chunk this process is responsible for
// with the reduced version of its inputs (if multiple are specified).
if (step == 0) {
reduceInputs(
group.myChunkOffset * elementSize, group.myChunkLength * elementSize);
}
// Reduce chunks from peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
opts.reduce(
static_cast<uint8_t*>(out->ptr) + (group.myChunkOffset * elementSize),
static_cast<const uint8_t*>(out->ptr) +
(group.myChunkOffset * elementSize),
static_cast<const uint8_t*>(tmp->ptr) +
(i * group.chunkLength * elementSize),
group.myChunkLength);
}
}
// There is one chunk that contains the final result and this chunk
// can already be broadcast locally to out[1..N], if applicable.
// Doing so means we only have to broadcast locally to out[1..N] all
// chunks as we receive them from our peers during the allgather phase.
{
const auto& group = groups.back();
broadcastOutputs(
group.myChunkOffset * elementSize, group.myChunkLength * elementSize);
}
// Allgather.
for (auto it = groups.rbegin(); it != groups.rend(); it++) {
const auto& group = *it;
// Issue receive operations for reduced chunks from peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
out->recv(
src,
slot,
currentChunkOffset * elementSize,
currentChunkLength * elementSize);
}
// Issue send operations for reduced chunk to peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto dst = group.ranks[i];
if (dst == context->rank) {
continue;
}
out->send(
dst,
slot,
group.myChunkOffset * elementSize,
group.myChunkLength * elementSize);
}
// Wait for operations to complete.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
out->waitRecv();
out->waitSend();
}
// Broadcast result to multiple output buffers, if applicable.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
broadcastOutputs(
currentChunkOffset * elementSize, currentChunkLength * elementSize);
}
}
}