inline void PushHorovodOperationCudaOnCPU()

in horovod/mxnet/mpi_ops.cc [252:318]


inline void PushHorovodOperationCudaOnCPU(OperationType op_type, NDArray* input,
                                          NDArray* output, const char* name,
                                          int priority, int root_rank = -1,
                                          bool average = true,
                                          NDArray* splits = nullptr,
                                          double prescale_factor = 1.0,
                                          double postscale_factor = 1.0) {
  auto op_type_name = GetOpTypeName(op_type);
  auto op_name = GetOpName(op_type_name, name);

  auto cpu_input_tensor = std::make_shared<NDArray>(Context::Create(Context::kCPU, 0),
    input->dtype());
  auto cpu_output_tensor = std::make_shared<NDArray>(Context::Create(Context::kCPU, 0),
    input->dtype());

  // Make async copy of input tensor to CPU tensor.
  TensorUtil::AsyncCopyCudaToCPU(input, cpu_input_tensor.get());

  std::shared_ptr<NDArray> splits_tensor;
  if (splits) {
    // We expect splits to be a tensor on CPU. Create CPU copy if required.
    if (!IsTensorOnCPU(splits)) {
      splits_tensor = std::make_shared<NDArray>(Context::Create(Context::kCPU, 0),
      splits->dtype());
      TensorUtil::AsyncCopyCudaToCPU(splits, splits_tensor.get());
    } else {
      splits_tensor = std::make_shared<NDArray>(*splits);
    }
  }

  auto ops_param = CreateMpiOpsParam(nullptr, nullptr, output, cpu_input_tensor,
                                     cpu_output_tensor, op_type, op_name, root_rank,
                                     average, splits_tensor, prescale_factor, postscale_factor);

  auto input_var = input->var();
  auto output_var = output->var();
  auto cpu_input_var = cpu_input_tensor->var();
  auto cpu_output_var = cpu_output_tensor->var();
  if (op_type == OperationType::ALLGATHER ||
      op_type == OperationType::ALLTOALL) {
    // Use out-of-place path for operations that have unknown output size (allgather, alltoall)
    std::vector<void*> input_vars {cpu_input_var};
    if (splits) {
      // Add splits tensor to input list to enforce dependency on possible async D2H copy
      input_vars.push_back(splits_tensor->var());
    }

    MXEnginePushAsync(DoHorovodOperationCudaOnCPU, ops_param, DeleteMpiOpsParam,
                      &MX_EXEC_CTX, input_vars.data(), input_vars.size(), &cpu_output_var, 1,
                      &MX_FUNC_PROP, priority, op_type_name);

    // Since cpu_output_tensor is resized in out-of-place path, need
    // to wait for operation to complete before copying to GPU output.
    cpu_output_tensor->WaitToRead();

    // Make async copy of CPU output tensor to output tensor.
    TensorUtil::AsyncCopyCPUToCuda(cpu_output_tensor.get(), output);
  } else {
    // Use in-place otherwise
    MXEnginePushAsync(DoHorovodOperationCudaOnCPU, ops_param, DeleteMpiOpsParam,
                      &MX_EXEC_CTX, nullptr, 0, &cpu_input_var, 1,
                      &MX_FUNC_PROP, priority, op_type_name);

    // Make async copy of CPU input tensor to output tensor.
    TensorUtil::AsyncCopyCPUToCuda(cpu_input_tensor.get(), output);
  }
}