void RdmaTensorResponse::RecvHandler()

in tensorflow_networking/verbs/rdma.cc [1038:1113]


void RdmaTensorResponse::RecvHandler(Rendezvous::ParsedKey parsed,
                                     const Rendezvous::Args& send_args,
                                     const Rendezvous::Args& recv_args,
                                     const Tensor& in, bool is_dead) {
  Status s = PrepareRecvTensor(parsed, &src_dev_);
  if (!s.ok()) {
    SendErrorStatus(s);
    return;
  }

  meta_data_changed_ = TensorMetaDataChanged(in, is_dead);
#ifdef RDMA_DATA_VALIDATION
  // Always send a meta data message with the source checksum
  meta_data_changed_ = rm_.type_ == RDMA_MESSAGE_TENSOR_REQUEST;
  checksum_ = Checksum(src_dev_, send_args.device_context, in);
#endif
  bool can_memcpy = DataTypeCanUseMemcpy(in.dtype());
  // string tensor needs to be serialized
  Tensor copy;
  TensorProto proto;
  const bool on_host = send_args.alloc_attrs.on_host();
  if (src_dev_->tensorflow_gpu_device_info() && !on_host) {
#if GOOGLE_CUDA
    DeviceContext* send_dev_context = send_args.device_context;
    CHECK(send_dev_context)
        << "send dev name: " << src_dev_->name()
        << " gpu_info: " << src_dev_->tensorflow_gpu_device_info();

    if (can_memcpy) {
      // If the tensor is located on a GDR compatible GPU, there is no need to
      // copy it. We can send directly from the source, just need to make sure
      // we are in sync with the GPU stream.
      // If the tensor's meta-data changed however, we will need to clone it,
      // so anyway we'll have to copy it from GPU to CPU first. If at some
      // point in time Clone() is changed to only save a shallow copy, we can
      // skip the copy here as well.
      if ((in.TotalBytes() > 0) && !meta_data_changed_ &&
          (RdmaMemoryMgr::Singleton().FindMemoryRegion(
               (void*)DMAHelper::base(&in), in.TotalBytes()) != nullptr)) {
        StreamGPUOp(src_dev_, send_dev_context,
                    [this, in, proto, is_dead](const Status& s) {
                      Send(in, proto, is_dead, s);
                    });
        return;
      }

      // The tensor must be copied from GPU to CPU, because either:
      // 1. The tensor is located on a non GDR compatible GPU.
      // 2. The tensor's meta-data has changed.
      Allocator* alloc = GPUProcessState::singleton()->GetGpuHostAllocator(0);
      copy = Tensor(alloc, in.dtype(), in.shape());
      CountCopies(rm_.name_, (void*)DMAHelper::base(&in),
                  (void*)DMAHelper::base(&copy), in.TotalBytes(), true);
      GPUUtil::CopyGPUTensorToCPU(
          src_dev_, send_dev_context, &in, &copy,
          [this, copy, proto, is_dead](const Status& s) {
            Send(copy, proto, is_dead, s);
          });
    } else {
      GPUUtil::SetProtoFromGPU(
          in, src_dev_, send_args.device_context, &proto, is_dead,
          [this, in, proto, is_dead](const Status& s) mutable {
            Send(in, proto, is_dead, s);
          });
    }
#else
    SendErrorStatus(errors::Internal("No GPU device in process"));
#endif  // GOOGLE_CUDA
  } else {
    // tensor is in CPU memory.
    if (!can_memcpy) {
      in.AsProtoTensorContent(&proto);
    }
    Send(in, proto, is_dead, Status::OK());
  }
}