in gloo/cuda_collectives_native.h [213:273]
virtual void runAsync() {
CudaDeviceGuard guard;
// Copy from source ptr to first device ptr
streams_[0].copyAsync(devicePtrs_[0], sourcePtr_);
// Tree broadcast
for (auto i = steps_ - 1; i >= 0; i--) {
auto sz = 1 << i;
for (auto j = 0; j < numPtrs_; j += sz * 2) {
const auto indexA = j;
const auto indexB = j + sz;
auto& streamA = streams_[indexA];
auto& streamB = streams_[indexB];
// Record event on target stream
CUDA_CHECK(cudaSetDevice(
devicePtrs_[indexB].getDeviceID()));
CUDA_CHECK(cudaEventRecord(
streamB.getEvent(),
streamB.getStream()));
// Make source stream wait on target stream.
// This ensures any operations on the target pointer
// have finished before we start the copy.
CUDA_CHECK(cudaSetDevice(
devicePtrs_[indexA].getDeviceID()));
CUDA_CHECK(cudaStreamWaitEvent(
streamA.getStream(),
streamB.getEvent(),
0));
// Execute copy and wait for it to complete on the target
// stream. This ensures that in the next iteration of this
// loop the target can be used as source while knowing the
// previous copy has completed.
CUDA_CHECK(cudaMemcpyAsync(
*devicePtrs_[indexB],
*devicePtrs_[indexA],
count_ * sizeof(T),
cudaMemcpyDeviceToDevice,
streamA.getStream()));
CUDA_CHECK(cudaEventRecord(
streamA.getEvent(),
streamA.getStream()));
CUDA_CHECK(cudaSetDevice(
devicePtrs_[indexB].getDeviceID()));
CUDA_CHECK(cudaStreamWaitEvent(
streamB.getStream(),
streamA.getEvent(),
0));
// Emit event on the target stream so we can wait on all
// events in the wait() function. Otherwise waiting on
// this event would NOT indicate completion.
CUDA_CHECK(cudaEventRecord(
streamB.getEvent(),
streamB.getStream()));
}
}
}