gloo/cuda_collectives_native.h (206 lines of code) (raw):
/**
* Copyright (c) 2017-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <algorithm>
#include <cmath>
#include "gloo/common/common.h"
#include "gloo/common/logging.h"
#include "gloo/cuda.h"
#include "gloo/cuda_private.h"
namespace gloo {
// Below works both for CudaHostPointer and CudaDevicePointer
template <typename T, typename Dst>
class CudaLocalNativeReduce : public LocalOp<T> {
public:
CudaLocalNativeReduce(
std::vector<CudaStream>& streams,
std::vector<CudaDevicePointer<T> >& devicePtrs,
Dst& targetPtr,
const CudaReductionFunction<T>* fn,
size_t offset,
size_t count)
: streams_(streams),
targetPtr_(targetPtr.range(offset, count)),
fn_(fn),
numPtrs_(devicePtrs.size()),
steps_(log2(numPtrs_)) {
// Only works with power-of-2 number of pointers
GLOO_ENFORCE((1 << steps_) != 0, streams.size(), "Not power of two");
// Incorporate offset/count into devicePtrs
devicePtrs_.reserve(devicePtrs.size());
for (const auto& ptr : devicePtrs) {
devicePtrs_.push_back(ptr.range(offset, count));
}
// Add level of indirection so that we can shuffle this instead
// of shuffling BOTH the streams and device ptr vectors.
for (auto i = 0; i < numPtrs_; i++) {
indices_.push_back(i);
}
// Shuffle order in an attempt to evenly spread work across devices when
// dealing with multiple instances of this operation.
std::random_shuffle(indices_.begin(), indices_.end());
// Initialize
CudaDeviceGuard guard;
for (auto i = 0; i < steps_; i++) {
auto sz = 1 << i;
for (auto j = 0; j < numPtrs_; j += sz * 2) {
auto indexA = indices_[j];
auto indexB = indices_[j + sz];
auto devA = devicePtrs_[indexA].getDeviceID();
auto devB = devicePtrs_[indexB].getDeviceID();
// Number of elements must be equal
GLOO_ENFORCE_EQ(
devicePtrs_[indexA].getCount(),
devicePtrs_[indexB].getCount());
// Devices must be able to access each others memory
int canAccessPeer = 0;
CUDA_CHECK(cudaDeviceCanAccessPeer(&canAccessPeer, devA, devB));
GLOO_ENFORCE_EQ(
1,
canAccessPeer,
"GPU ",
devA,
" does not have peer access to GPU ",
devB);
// Enable peer access for devA to memory on devB
CUDA_CHECK(cudaSetDevice(devA));
cudaDeviceEnablePeerAccess(devB, 0);
// Use cudaGetLastError so that any error is cleared.
auto err = cudaGetLastError();
if (err != cudaErrorPeerAccessAlreadyEnabled) {
CUDA_CHECK(err);
}
}
}
}
virtual void runAsync() {
CudaDeviceGuard guard;
for (auto i = 0; i < steps_; i++) {
auto sz = 1 << i;
for (auto j = 0; j < numPtrs_; j += sz * 2) {
const auto indexA = indices_[j];
const auto indexB = indices_[j + sz];
auto& streamA = streams_[indexA];
auto& streamB = streams_[indexB];
// Record event on secondary stream
CUDA_CHECK(cudaSetDevice(devicePtrs_[indexB].getDeviceID()));
CUDA_CHECK(cudaEventRecord(
streamB.getEvent(),
streamB.getStream()));
// Make primary stream wait for secondary stream.
// This ensures any operations on the source pointer
// have finished before we start the reduction.
CUDA_CHECK(cudaSetDevice(devicePtrs_[indexA].getDeviceID()));
CUDA_CHECK(cudaStreamWaitEvent(
streamA.getStream(),
streamB.getEvent(),
0));
// Queue reduction
fn_->call(
devicePtrs_[indexA],
devicePtrs_[indexB],
devicePtrs_[indexA].getCount(),
streamA);
}
}
// Queue copy to target on the root stream
auto root = indices_[0];
streams_[root].copyAsync(targetPtr_, devicePtrs_[root]);
}
virtual void wait() {
// Wait for the final memory copy to complete
auto root = indices_[0];
streams_[root].wait();
}
protected:
std::vector<CudaStream>& streams_;
std::vector<CudaDevicePointer<T> > devicePtrs_;
Dst targetPtr_;
const CudaReductionFunction<T>* fn_;
const int numPtrs_;
const int steps_;
std::vector<int> indices_;
};
// Below works both for CudaHostPointer and CudaDevicePointer
template <typename T, typename Src>
class CudaLocalNativeBroadcast : public LocalOp<T> {
public:
CudaLocalNativeBroadcast(
std::vector<CudaStream>& streams,
std::vector<CudaDevicePointer<T> >& devicePtrs,
Src& sourcePtr,
size_t offset,
size_t count)
: streams_(streams),
sourcePtr_(sourcePtr.range(offset, count)),
count_(count),
numPtrs_(devicePtrs.size()),
steps_(log2(numPtrs_)) {
// Only works with power-of-2 number of pointers
GLOO_ENFORCE((1 << steps_) != 0, streams.size(), "Not power of two");
// Incorporate offset/count into devicePtrs
devicePtrs_.reserve(devicePtrs.size());
for (const auto& ptr : devicePtrs) {
devicePtrs_.push_back(ptr.range(offset, count));
}
// Initialize
CudaDeviceGuard guard;
for (auto i = steps_ - 1; i >= 0; i--) {
auto sz = 1 << i;
for (auto j = 0; j < numPtrs_; j += sz * 2) {
auto indexA = j;
auto indexB = j + sz;
auto devA = devicePtrs_[indexA].getDeviceID();
auto devB = devicePtrs_[indexB].getDeviceID();
// Number of elements must be equal
GLOO_ENFORCE_EQ(
devicePtrs_[indexA].getCount(),
devicePtrs_[indexB].getCount());
// Devices must be able to access each others memory
int canAccessPeer = 0;
CUDA_CHECK(cudaDeviceCanAccessPeer(&canAccessPeer, devA, devB));
GLOO_ENFORCE_EQ(
1,
canAccessPeer,
"GPU ",
devA,
" does not have peer access to GPU ",
devB);
// Enable peer access for devA to memory on devB
CUDA_CHECK(cudaSetDevice(devA));
cudaDeviceEnablePeerAccess(devB, 0);
// Use cudaGetLastError so that any error is cleared.
auto err = cudaGetLastError();
if (err != cudaErrorPeerAccessAlreadyEnabled) {
CUDA_CHECK(err);
}
}
}
}
virtual void runAsync() {
CudaDeviceGuard guard;
// Copy from source ptr to first device ptr
streams_[0].copyAsync(devicePtrs_[0], sourcePtr_);
// Tree broadcast
for (auto i = steps_ - 1; i >= 0; i--) {
auto sz = 1 << i;
for (auto j = 0; j < numPtrs_; j += sz * 2) {
const auto indexA = j;
const auto indexB = j + sz;
auto& streamA = streams_[indexA];
auto& streamB = streams_[indexB];
// Record event on target stream
CUDA_CHECK(cudaSetDevice(
devicePtrs_[indexB].getDeviceID()));
CUDA_CHECK(cudaEventRecord(
streamB.getEvent(),
streamB.getStream()));
// Make source stream wait on target stream.
// This ensures any operations on the target pointer
// have finished before we start the copy.
CUDA_CHECK(cudaSetDevice(
devicePtrs_[indexA].getDeviceID()));
CUDA_CHECK(cudaStreamWaitEvent(
streamA.getStream(),
streamB.getEvent(),
0));
// Execute copy and wait for it to complete on the target
// stream. This ensures that in the next iteration of this
// loop the target can be used as source while knowing the
// previous copy has completed.
CUDA_CHECK(cudaMemcpyAsync(
*devicePtrs_[indexB],
*devicePtrs_[indexA],
count_ * sizeof(T),
cudaMemcpyDeviceToDevice,
streamA.getStream()));
CUDA_CHECK(cudaEventRecord(
streamA.getEvent(),
streamA.getStream()));
CUDA_CHECK(cudaSetDevice(
devicePtrs_[indexB].getDeviceID()));
CUDA_CHECK(cudaStreamWaitEvent(
streamB.getStream(),
streamA.getEvent(),
0));
// Emit event on the target stream so we can wait on all
// events in the wait() function. Otherwise waiting on
// this event would NOT indicate completion.
CUDA_CHECK(cudaEventRecord(
streamB.getEvent(),
streamB.getStream()));
}
}
}
virtual void wait() {
// Wait for all memory copies on the source streams and receipt
// confirmation on the target streams to complete.
for (auto& stream : streams_) {
stream.wait();
}
}
protected:
std::vector<CudaStream>& streams_;
std::vector<CudaDevicePointer<T> > devicePtrs_;
Src sourcePtr_;
const int count_;
const int numPtrs_;
const int steps_;
};
} // namespace gloo