void bcube()

in gloo/allreduce.cc [429:670]


void bcube(
    const detail::AllreduceOptionsImpl& opts,
    ReduceRangeFunction reduceInputs,
    BroadcastRangeFunction broadcastOutputs) {
  const auto& context = opts.context;
  const auto slot = Slot::build(kAllreduceSlotPrefix, opts.tag);
  const auto elementSize = opts.elementSize;
  auto& out = opts.out[0];

  constexpr auto n = 2;

  // Figure out the number of steps in this algorithm.
  const auto groupSizePerStep = computeGroupSizePerStep(context->size, n);

  struct group {
    // Distance between peers in this group.
    size_t peerDistance;

    // Segment that this group is responsible for reducing.
    size_t bufferOffset;
    size_t bufferLength;

    // The process ranks that are a member of this group.
    std::vector<size_t> ranks;

    // Upper bound of the length of the chunk that each process has the
    // reduced values for by the end of the reduction for this group.
    size_t chunkLength;

    // Chunk within the segment that this process is responsible for reducing.
    size_t myChunkOffset;
    size_t myChunkLength;
  };

  // Compute the details of a group at every algorithm step.
  // We keep this in a vector because we iterate through it in forward order in
  // the reduce/scatter phase and in backward order in the allgather phase.
  std::vector<struct group> groups;
  {
    struct group group;
    group.peerDistance = 1;
    group.bufferOffset = 0;
    group.bufferLength = opts.elements;
    for (const size_t groupSize : groupSizePerStep) {
      const size_t groupRank = (context->rank / group.peerDistance) % groupSize;
      const size_t baseRank = context->rank - (groupRank * group.peerDistance);
      group.ranks.reserve(groupSize);
      for (size_t i = 0; i < groupSize; i++) {
        group.ranks.push_back(baseRank + i * group.peerDistance);
      }

      // Compute the length of the chunk we're exchanging at this step.
      group.chunkLength = ((group.bufferLength + (groupSize - 1)) / groupSize);

      // This process is computing the reduction of the chunk positioned at
      // <rank>/<size> within the current segment.
      group.myChunkOffset =
          group.bufferOffset + (groupRank * group.chunkLength);
      group.myChunkLength = std::min(
          size_t(group.chunkLength),
          size_t(std::max(
              int64_t(0),
              int64_t(group.bufferLength) -
                  int64_t(groupRank * group.chunkLength))));

      // Store a const copy of this group in the vector.
      groups.push_back(group);

      // Initialize with updated peer distance and segment offset and length.
      struct group nextGroup;
      nextGroup.peerDistance = group.peerDistance * groupSize;
      nextGroup.bufferOffset = group.myChunkOffset;
      nextGroup.bufferLength = group.myChunkLength;
      std::swap(group, nextGroup);
    }
  }

  // The chunk length is rounded up, so the maximum scratch space we need
  // might be larger than the size of the output buffer. Compute the maximum
  size_t bufferLength = opts.elements;
  for (const auto& group : groups) {
    bufferLength =
        std::max(bufferLength, group.ranks.size() * group.chunkLength);
  }

  // Allocate scratch space to receive data from peers.
  const size_t bufferSize = bufferLength * elementSize;
  std::unique_ptr<uint8_t[]> buffer(new uint8_t[bufferSize]);
  std::unique_ptr<transport::UnboundBuffer> tmp =
      context->createUnboundBuffer(buffer.get(), bufferSize);

  // Reduce/scatter.
  for (size_t step = 0; step < groups.size(); step++) {
    const auto& group = groups[step];

    // Issue receive operations for chunks from peers.
    for (size_t i = 0; i < group.ranks.size(); i++) {
      const auto src = group.ranks[i];
      if (src == context->rank) {
        continue;
      }
      tmp->recv(
          src,
          slot,
          i * group.chunkLength * elementSize,
          group.myChunkLength * elementSize);
    }

    // Issue send operations for local chunks to peers.
    for (size_t i = 0; i < group.ranks.size(); i++) {
      const auto dst = group.ranks[i];
      if (dst == context->rank) {
        continue;
      }
      const size_t currentChunkOffset =
          group.bufferOffset + i * group.chunkLength;
      const size_t currentChunkLength = std::min(
          size_t(group.chunkLength),
          size_t(std::max(
              int64_t(0),
              int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
      // Compute the local reduction only in the first step of the algorithm.
      // In subsequent steps, we already have a partially reduced result.
      if (step == 0) {
        reduceInputs(
            currentChunkOffset * elementSize, currentChunkLength * elementSize);
      }
      out->send(
          dst,
          slot,
          currentChunkOffset * elementSize,
          currentChunkLength * elementSize);
    }

    // Wait for send and receive operations to complete.
    for (size_t i = 0; i < group.ranks.size(); i++) {
      const auto peer = group.ranks[i];
      if (peer == context->rank) {
        continue;
      }
      tmp->waitRecv();
      out->waitSend();
    }

    // In the first step, prepare the chunk this process is responsible for
    // with the reduced version of its inputs (if multiple are specified).
    if (step == 0) {
      reduceInputs(
          group.myChunkOffset * elementSize, group.myChunkLength * elementSize);
    }

    // Reduce chunks from peers.
    for (size_t i = 0; i < group.ranks.size(); i++) {
      const auto src = group.ranks[i];
      if (src == context->rank) {
        continue;
      }
      opts.reduce(
          static_cast<uint8_t*>(out->ptr) + (group.myChunkOffset * elementSize),
          static_cast<const uint8_t*>(out->ptr) +
              (group.myChunkOffset * elementSize),
          static_cast<const uint8_t*>(tmp->ptr) +
              (i * group.chunkLength * elementSize),
          group.myChunkLength);
    }
  }

  // There is one chunk that contains the final result and this chunk
  // can already be broadcast locally to out[1..N], if applicable.
  // Doing so means we only have to broadcast locally to out[1..N] all
  // chunks as we receive them from our peers during the allgather phase.
  {
    const auto& group = groups.back();
    broadcastOutputs(
        group.myChunkOffset * elementSize, group.myChunkLength * elementSize);
  }

  // Allgather.
  for (auto it = groups.rbegin(); it != groups.rend(); it++) {
    const auto& group = *it;

    // Issue receive operations for reduced chunks from peers.
    for (size_t i = 0; i < group.ranks.size(); i++) {
      const auto src = group.ranks[i];
      if (src == context->rank) {
        continue;
      }
      const size_t currentChunkOffset =
          group.bufferOffset + i * group.chunkLength;
      const size_t currentChunkLength = std::min(
          size_t(group.chunkLength),
          size_t(std::max(
              int64_t(0),
              int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
      out->recv(
          src,
          slot,
          currentChunkOffset * elementSize,
          currentChunkLength * elementSize);
    }

    // Issue send operations for reduced chunk to peers.
    for (size_t i = 0; i < group.ranks.size(); i++) {
      const auto dst = group.ranks[i];
      if (dst == context->rank) {
        continue;
      }
      out->send(
          dst,
          slot,
          group.myChunkOffset * elementSize,
          group.myChunkLength * elementSize);
    }

    // Wait for operations to complete.
    for (size_t i = 0; i < group.ranks.size(); i++) {
      const auto peer = group.ranks[i];
      if (peer == context->rank) {
        continue;
      }
      out->waitRecv();
      out->waitSend();
    }

    // Broadcast result to multiple output buffers, if applicable.
    for (size_t i = 0; i < group.ranks.size(); i++) {
      const auto peer = group.ranks[i];
      if (peer == context->rank) {
        continue;
      }
      const size_t currentChunkOffset =
          group.bufferOffset + i * group.chunkLength;
      const size_t currentChunkLength = std::min(
          size_t(group.chunkLength),
          size_t(std::max(
              int64_t(0),
              int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
      broadcastOutputs(
          currentChunkOffset * elementSize, currentChunkLength * elementSize);
    }
  }
}