void SeastarWorker::RecvTensorAsync()

in tensorflow_networking/seastar/seastar_worker_service.cc [239:305]


void SeastarWorker::RecvTensorAsync(CallOptions* opts,
                                    const RecvTensorRequest* request,
                                    SeastarTensorResponse* response,
                                    StatusCallback done) {
  const int64 step_id = request->step_id();
  const string& key = request->rendezvous_key();
  Rendezvous::ParsedKey parsed;

  Status s = Rendezvous::ParseKey(key, &parsed);
  Device* src_dev = nullptr;
  if (s.ok()) {
    s = PrepareRecvTensor(parsed, &src_dev);
  }
  if (!s.ok()) {
    done(s);
    LOG(FATAL) << "PrepareRecvTensor failed, tensor:" << key;
  }

  env_->rendezvous_mgr->RecvLocalAsync(
      step_id, parsed,
      [opts, request, response, done, src_dev, key](
          const Status& status, const Rendezvous::Args& send_args,
          const Rendezvous::Args& recv_args, const Tensor& val,
          const bool is_dead) {
        if (!status.ok()) {
          LOG(WARNING)
              << "env_->rendezvous_mgr->RecvLocalAsync failed, error msg is: "
              << status.error_message();
        }

        if (status.ok()) {
          response->SetIsDead(is_dead);
          bool can_memcpy = DataTypeCanUseMemcpy(val.dtype());

          if (src_dev->tensorflow_gpu_device_info() &&
              (!send_args.alloc_attrs.on_host())) {
            CHECK(send_args.device_context)
                << "send dev name: " << src_dev->name()
                << " gpu_info: " << src_dev->tensorflow_gpu_device_info();

            AllocatorAttributes alloc_attrs;
            alloc_attrs.set_gpu_compatible(true);
            alloc_attrs.set_on_host(true);
            Allocator* alloc = src_dev->GetAllocator(alloc_attrs);
            Tensor* cpu_copy = new Tensor(alloc, val.dtype(), val.shape());

            send_args.device_context->CopyDeviceTensorToCPU(
                &val, request->rendezvous_key(), src_dev, cpu_copy,
                [response, cpu_copy, done](const Status& s) {
                  CHECK(s.ok()) << "copy tensor from gpu sync";
                  response->SetTensor(*cpu_copy);
                  delete cpu_copy;
                  done(s);
                });
          } else {
            // tensor is in CPU memory.
            response->SetTensor(val);
            if (!can_memcpy) {
              val.AsProtoTensorContent(&response->GetTensorProto());
            }
            done(Status());
          }
        } else {
          done(status);
        }
      });
}