in horovod/mxnet/mpi_ops.cc [252:318]
inline void PushHorovodOperationCudaOnCPU(OperationType op_type, NDArray* input,
NDArray* output, const char* name,
int priority, int root_rank = -1,
bool average = true,
NDArray* splits = nullptr,
double prescale_factor = 1.0,
double postscale_factor = 1.0) {
auto op_type_name = GetOpTypeName(op_type);
auto op_name = GetOpName(op_type_name, name);
auto cpu_input_tensor = std::make_shared<NDArray>(Context::Create(Context::kCPU, 0),
input->dtype());
auto cpu_output_tensor = std::make_shared<NDArray>(Context::Create(Context::kCPU, 0),
input->dtype());
// Make async copy of input tensor to CPU tensor.
TensorUtil::AsyncCopyCudaToCPU(input, cpu_input_tensor.get());
std::shared_ptr<NDArray> splits_tensor;
if (splits) {
// We expect splits to be a tensor on CPU. Create CPU copy if required.
if (!IsTensorOnCPU(splits)) {
splits_tensor = std::make_shared<NDArray>(Context::Create(Context::kCPU, 0),
splits->dtype());
TensorUtil::AsyncCopyCudaToCPU(splits, splits_tensor.get());
} else {
splits_tensor = std::make_shared<NDArray>(*splits);
}
}
auto ops_param = CreateMpiOpsParam(nullptr, nullptr, output, cpu_input_tensor,
cpu_output_tensor, op_type, op_name, root_rank,
average, splits_tensor, prescale_factor, postscale_factor);
auto input_var = input->var();
auto output_var = output->var();
auto cpu_input_var = cpu_input_tensor->var();
auto cpu_output_var = cpu_output_tensor->var();
if (op_type == OperationType::ALLGATHER ||
op_type == OperationType::ALLTOALL) {
// Use out-of-place path for operations that have unknown output size (allgather, alltoall)
std::vector<void*> input_vars {cpu_input_var};
if (splits) {
// Add splits tensor to input list to enforce dependency on possible async D2H copy
input_vars.push_back(splits_tensor->var());
}
MXEnginePushAsync(DoHorovodOperationCudaOnCPU, ops_param, DeleteMpiOpsParam,
&MX_EXEC_CTX, input_vars.data(), input_vars.size(), &cpu_output_var, 1,
&MX_FUNC_PROP, priority, op_type_name);
// Since cpu_output_tensor is resized in out-of-place path, need
// to wait for operation to complete before copying to GPU output.
cpu_output_tensor->WaitToRead();
// Make async copy of CPU output tensor to output tensor.
TensorUtil::AsyncCopyCPUToCuda(cpu_output_tensor.get(), output);
} else {
// Use in-place otherwise
MXEnginePushAsync(DoHorovodOperationCudaOnCPU, ops_param, DeleteMpiOpsParam,
&MX_EXEC_CTX, nullptr, 0, &cpu_input_var, 1,
&MX_FUNC_PROP, priority, op_type_name);
// Make async copy of CPU input tensor to output tensor.
TensorUtil::AsyncCopyCPUToCuda(cpu_input_tensor.get(), output);
}
}