in tensorflow_networking/verbs/rdma.cc [1038:1113]
void RdmaTensorResponse::RecvHandler(Rendezvous::ParsedKey parsed,
const Rendezvous::Args& send_args,
const Rendezvous::Args& recv_args,
const Tensor& in, bool is_dead) {
Status s = PrepareRecvTensor(parsed, &src_dev_);
if (!s.ok()) {
SendErrorStatus(s);
return;
}
meta_data_changed_ = TensorMetaDataChanged(in, is_dead);
#ifdef RDMA_DATA_VALIDATION
// Always send a meta data message with the source checksum
meta_data_changed_ = rm_.type_ == RDMA_MESSAGE_TENSOR_REQUEST;
checksum_ = Checksum(src_dev_, send_args.device_context, in);
#endif
bool can_memcpy = DataTypeCanUseMemcpy(in.dtype());
// string tensor needs to be serialized
Tensor copy;
TensorProto proto;
const bool on_host = send_args.alloc_attrs.on_host();
if (src_dev_->tensorflow_gpu_device_info() && !on_host) {
#if GOOGLE_CUDA
DeviceContext* send_dev_context = send_args.device_context;
CHECK(send_dev_context)
<< "send dev name: " << src_dev_->name()
<< " gpu_info: " << src_dev_->tensorflow_gpu_device_info();
if (can_memcpy) {
// If the tensor is located on a GDR compatible GPU, there is no need to
// copy it. We can send directly from the source, just need to make sure
// we are in sync with the GPU stream.
// If the tensor's meta-data changed however, we will need to clone it,
// so anyway we'll have to copy it from GPU to CPU first. If at some
// point in time Clone() is changed to only save a shallow copy, we can
// skip the copy here as well.
if ((in.TotalBytes() > 0) && !meta_data_changed_ &&
(RdmaMemoryMgr::Singleton().FindMemoryRegion(
(void*)DMAHelper::base(&in), in.TotalBytes()) != nullptr)) {
StreamGPUOp(src_dev_, send_dev_context,
[this, in, proto, is_dead](const Status& s) {
Send(in, proto, is_dead, s);
});
return;
}
// The tensor must be copied from GPU to CPU, because either:
// 1. The tensor is located on a non GDR compatible GPU.
// 2. The tensor's meta-data has changed.
Allocator* alloc = GPUProcessState::singleton()->GetGpuHostAllocator(0);
copy = Tensor(alloc, in.dtype(), in.shape());
CountCopies(rm_.name_, (void*)DMAHelper::base(&in),
(void*)DMAHelper::base(©), in.TotalBytes(), true);
GPUUtil::CopyGPUTensorToCPU(
src_dev_, send_dev_context, &in, ©,
[this, copy, proto, is_dead](const Status& s) {
Send(copy, proto, is_dead, s);
});
} else {
GPUUtil::SetProtoFromGPU(
in, src_dev_, send_args.device_context, &proto, is_dead,
[this, in, proto, is_dead](const Status& s) mutable {
Send(in, proto, is_dead, s);
});
}
#else
SendErrorStatus(errors::Internal("No GPU device in process"));
#endif // GOOGLE_CUDA
} else {
// tensor is in CPU memory.
if (!can_memcpy) {
in.AsProtoTensorContent(&proto);
}
Send(in, proto, is_dead, Status::OK());
}
}