void run()

in gloo/allreduce_halving_doubling.h [224:361]


  void run() {
    if (count_ == 0) {
      return;
    }
    size_t bufferOffset = 0;
    size_t numItems =
        stepsWithinBlock_ > 0 ? chunkSize_ << (steps_ - 1) : count_;

    for (int i = 1; i < ptrs_.size(); i++) {
      fn_->call(ptrs_[0], ptrs_[i], count_);
    }
    if (this->contextSize_ == 1) {
      // Broadcast ptrs_[0]
      for (int i = 1; i < ptrs_.size(); i++) {
        memcpy(ptrs_[i], ptrs_[0], bytes_);
      }
      return;
    }

    // Reduce-scatter
    for (int i = 0; i < stepsWithinBlock_; i++) {
      if (sendOffsets_[i] < count_) {
        sendDataBufs_[i]->send(
            sendOffsets_[i] * sizeof(T), sendCounts_[i] * sizeof(T));
      }
      if (recvOffsets_[i] < count_) {
        recvDataBufs_[i]->waitRecv();
        fn_->call(
            &ptrs_[0][recvOffsets_[i]],
            &recvBuf_[bufferOffset],
            recvCounts_[i]);
      }
      bufferOffset += numItems;
      sendNotificationBufs_[i]->send();
      numItems >>= 1;
    }

    // Communication across binary blocks for non-power-of-two number of
    // processes

    // receive from smaller block
    // data sizes same as in the last step of intrablock reduce-scatter above
    if (nextSmallerBlockSize_ != 0 && smallerBlockRecvDataBuf_ != nullptr) {
      smallerBlockRecvDataBuf_->waitRecv();
      fn_->call(
          &ptrs_[0][recvOffsets_[stepsWithinBlock_ - 1]],
          &recvBuf_[bufferOffset],
          recvCounts_[stepsWithinBlock_ - 1]);
    }

    const auto totalItemsToSend =
        stepsWithinBlock_ > 0 ? recvCounts_[stepsWithinBlock_ - 1] : count_;
    if (nextLargerBlockSize_ != 0 && totalItemsToSend != 0) {
      // scatter to larger block
      const auto offset =
          stepsWithinBlock_ > 0 ? recvOffsets_[stepsWithinBlock_ - 1] : 0;
      const auto numSendsAndReceivesToLargerBlock =
          nextLargerBlockSize_ / myBinaryBlockSize_;
      for (int i = 0; i < numSendsAndReceivesToLargerBlock; i++) {
        if (sendCountToLargerBlock_ * i < totalItemsToSend) {
          largerBlockSendDataBufs_[i]->send(
              (offset + i * sendCountToLargerBlock_) * sizeof(T),
              std::min(
                  sendCountToLargerBlock_,
                  totalItemsToSend - sendCountToLargerBlock_ * i) *
                  sizeof(T));
        }
      }
      // no notification is needed because the forward and backward messages
      // across blocks are serialized in relation to each other

      // receive from larger blocks
      for (int i = 0; i < numSendsAndReceivesToLargerBlock; i++) {
        if (sendCountToLargerBlock_ * i < totalItemsToSend) {
          largerBlockRecvDataBufs_[i]->waitRecv();
        }
      }
      memcpy(
          &ptrs_[0][offset],
          &recvBuf_[bufferOffset],
          totalItemsToSend * sizeof(T));
    }

    // Send to smaller block (technically the beginning of allgather)
    bool sentToSmallerBlock = false;
    if (nextSmallerBlockSize_ != 0) {
      if (recvOffsets_[stepsWithinBlock_ - 1] < count_) {
        sentToSmallerBlock = true;
        smallerBlockSendDataBuf_->send(
            recvOffsets_[stepsWithinBlock_ - 1] * sizeof(T),
            recvCounts_[stepsWithinBlock_ - 1] * sizeof(T));
      }
    }

    // Allgather
    numItems = chunkSize_ << (steps_ - stepsWithinBlock_);
    for (int i = stepsWithinBlock_ - 1; i >= 0; i--) {
      // verify that destination rank has received and processed this rank's
      // message during the reduce-scatter phase
      recvNotificationBufs_[i]->waitRecv();
      if (recvOffsets_[i] < count_) {
        sendDataBufs_[i]->send(
            recvOffsets_[i] * sizeof(T), recvCounts_[i] * sizeof(T));
      }
      bufferOffset -= numItems;
      if (sendOffsets_[i] < count_) {
        recvDataBufs_[i]->waitRecv();
        memcpy(
            &ptrs_[0][sendOffsets_[i]],
            &recvBuf_[bufferOffset],
            sendCounts_[i] * sizeof(T));
      }
      numItems <<= 1;

      // Send notification to the pair we just received from that
      // we're done dealing with the receive buffer.
      sendNotificationBufs_[i]->send();
    }

    // Broadcast ptrs_[0]
    for (int i = 1; i < ptrs_.size(); i++) {
      memcpy(ptrs_[i], ptrs_[0], bytes_);
    }

    // Wait for notifications from our peers within the block to make
    // sure we can send data immediately without risking overwriting
    // data in its receive buffer before it consumed that data.
    for (int i = stepsWithinBlock_ - 1; i >= 0; i--) {
      recvNotificationBufs_[i]->waitRecv();
    }

    // We have to be sure the send to the smaller block (if any) has
    // completed before returning. If we don't, the buffer contents may
    // be modified by our caller.
    if (sentToSmallerBlock) {
      smallerBlockSendDataBuf_->waitSend();
    }
  }