void read()

in src/rpc.cc [933:1010]


  void read(Me<RpcConnectionImpl>&& me) {
    rpc.log("read %s :: %p\n", connectionTypeName[index<API>], (void*)this);
    API::cast(connection)
        .readDescriptor([me = std::move(me)](auto&& error, tensorpipe_moorpc::Message msg) mutable noexcept {
          me->rpc.log("%s :: %p got data\n", connectionTypeName[index<API>], (void*)&*me);
          if (me->dead.load(std::memory_order_relaxed)) {
            me->rpc.log("already dead!\n");
            return;
          }
          if (error) {
            me->onError(error);
          } else {
            if (msg.metadata.size() != 4 || msg.tensors.size() == 0) {
              me->onError("Received Invalid data");
            } else {
              uint32_t size;
              deserializeBuffer(msg.metadata.data(), 4, size);
              // me->rpc.log("got %d bytes\n", size);
              BufferHandle buffer = makeBuffer(size, msg.tensors.size() - 1);
              bool valid = true;
              if (msg.tensors[0].buffer.type != tensorpipe_moorpc::DeviceType::kCpu) {
                valid = false;
              }
              msg.tensors[0].buffer.cpu.ptr = buffer->data();
              if (msg.tensors[0].buffer.cpu.length !=
                  size_t((std::byte*)(buffer->tensorMetaDataOffsets() + buffer->nTensors) - buffer->data())) {
                valid = false;
              }
              std::vector<Allocator> allocators;
              allocators.reserve(msg.tensors.size() - 1);
              for (size_t i = 1; i != msg.tensors.size(); ++i) {
                auto& tensor = msg.tensors[i];
                if (tensor.buffer.type == tensorpipe_moorpc::DeviceType::kCpu) {
                  allocators.emplace_back(rpc::kCPU, tensor.buffer.cpu.length);
                  tensor.buffer.cpu.ptr = allocators.back().data();
#ifdef USE_CUDA
                } else if (tensor.buffer.type == tensorpipe_moorpc::DeviceType::kCuda) {
                  allocators.emplace_back(rpc::kCUDA, tensor.buffer.cpu.length);
                  tensor.buffer.cpu.ptr = allocators.back().data();
#endif
                } else
                  me->onError("Received invalid tensor device type");
              }
              if (valid) {
                API::cast(me->connection)
                    .read(
                        std::move(msg),
                        [allocators = std::move(allocators), buffer = std::move(buffer), me = std::move(me)](
                            auto&& error, [[maybe_unused]] tensorpipe_moorpc::Message msg) mutable noexcept {
                          if (error) {
                            me->onError(error);
                          } else {
                            auto* offsets = buffer->tensorMetaDataOffsets();
                            auto* tensors = buffer->tensors();
                            auto* data = buffer->data();
                            size_t nTensors = buffer->nTensors;
                            size_t len = buffer->size;
                            for (size_t i = 0; i != nTensors; ++i) {
                              Tensor& t = tensors[i].tensor;
                              decltype(t.scalar_type()) dtype;
                              decltype(t.sizes()) sizes;
                              decltype(t.strides()) strides;
                              deserializeBufferPart(
                                  data + offsets[i], len > offsets[i] ? len - offsets[i] : 0, dtype, sizes, strides);
                              t = allocators[i].set(dtype, sizes, strides);
                            }
                            me->onData(std::move(buffer));

                            me->read(std::move(me));
                          }
                        });
              } else {
                me->onError("Received invalid data (2)");
              }
            }
          }
        });
  }