in horovod/mxnet/mpi_ops.cc [198:250]
void DoHorovodOperationCudaOnCPU(void*, void* on_complete_ptr, void* param) {
ThrowIfError(common::CheckInitialized());
auto on_complete = *static_cast<CallbackOnComplete*>(on_complete_ptr);
auto ops_param = static_cast<MpiOpsParam*>(param);
auto name = ops_param->op_name;
auto hvd_cpu_buffer = std::make_shared<MXTensor>(ops_param->cpu_input_tensor.get());
auto hvd_context = std::make_shared<MXOpContext>(
CPU_DEVICE_ID, ops_param->cpu_output_tensor.get());
auto average = ops_param->average;
auto prescale_factor = ops_param->prescale_factor;
auto postscale_factor = ops_param->postscale_factor;
Status enqueue_result;
switch (ops_param->op_type) {
case OperationType::ALLREDUCE:
enqueue_result = EnqueueTensorAllreduce(
hvd_context, hvd_cpu_buffer, hvd_cpu_buffer, nullptr, name, CPU_DEVICE_ID,
[on_complete](const Status& status) {
InvokeCompleteCallback(on_complete, status);
}, (average) ? ReduceOp::AVERAGE : ReduceOp::SUM, prescale_factor, postscale_factor);
break;
case OperationType::ALLGATHER:
enqueue_result = EnqueueTensorAllgather(
hvd_context, hvd_cpu_buffer, nullptr, name, CPU_DEVICE_ID,
[on_complete](const Status& status) {
InvokeCompleteCallback(on_complete, status);
});
break;
case OperationType::BROADCAST:
enqueue_result = EnqueueTensorBroadcast(
hvd_context, hvd_cpu_buffer, hvd_cpu_buffer, ops_param->root_rank,
nullptr, name, CPU_DEVICE_ID,
[on_complete](const Status& status) {
InvokeCompleteCallback(on_complete, status);
});
break;
case OperationType::ALLTOALL:
{
auto hvd_splits = std::make_shared<MXTensor>(ops_param->splits_tensor.get());
enqueue_result = EnqueueTensorAlltoall(
hvd_context, hvd_cpu_buffer, hvd_splits, nullptr, name, CPU_DEVICE_ID,
[on_complete](const Status& status) {
InvokeCompleteCallback(on_complete, status);
});
break;
}
default:
throw std::logic_error("Unsupported Horovod operation type.");
}
ThrowIfError(enqueue_result);
}