in tensorflow_networking/seastar/seastar_worker_service.cc [239:305]
void SeastarWorker::RecvTensorAsync(CallOptions* opts,
const RecvTensorRequest* request,
SeastarTensorResponse* response,
StatusCallback done) {
const int64 step_id = request->step_id();
const string& key = request->rendezvous_key();
Rendezvous::ParsedKey parsed;
Status s = Rendezvous::ParseKey(key, &parsed);
Device* src_dev = nullptr;
if (s.ok()) {
s = PrepareRecvTensor(parsed, &src_dev);
}
if (!s.ok()) {
done(s);
LOG(FATAL) << "PrepareRecvTensor failed, tensor:" << key;
}
env_->rendezvous_mgr->RecvLocalAsync(
step_id, parsed,
[opts, request, response, done, src_dev, key](
const Status& status, const Rendezvous::Args& send_args,
const Rendezvous::Args& recv_args, const Tensor& val,
const bool is_dead) {
if (!status.ok()) {
LOG(WARNING)
<< "env_->rendezvous_mgr->RecvLocalAsync failed, error msg is: "
<< status.error_message();
}
if (status.ok()) {
response->SetIsDead(is_dead);
bool can_memcpy = DataTypeCanUseMemcpy(val.dtype());
if (src_dev->tensorflow_gpu_device_info() &&
(!send_args.alloc_attrs.on_host())) {
CHECK(send_args.device_context)
<< "send dev name: " << src_dev->name()
<< " gpu_info: " << src_dev->tensorflow_gpu_device_info();
AllocatorAttributes alloc_attrs;
alloc_attrs.set_gpu_compatible(true);
alloc_attrs.set_on_host(true);
Allocator* alloc = src_dev->GetAllocator(alloc_attrs);
Tensor* cpu_copy = new Tensor(alloc, val.dtype(), val.shape());
send_args.device_context->CopyDeviceTensorToCPU(
&val, request->rendezvous_key(), src_dev, cpu_copy,
[response, cpu_copy, done](const Status& s) {
CHECK(s.ok()) << "copy tensor from gpu sync";
response->SetTensor(*cpu_copy);
delete cpu_copy;
done(s);
});
} else {
// tensor is in CPU memory.
response->SetTensor(val);
if (!can_memcpy) {
val.AsProtoTensorContent(&response->GetTensorProto());
}
done(Status());
}
} else {
done(status);
}
});
}