in src/rpc.cc [933:1010]
void read(Me<RpcConnectionImpl>&& me) {
rpc.log("read %s :: %p\n", connectionTypeName[index<API>], (void*)this);
API::cast(connection)
.readDescriptor([me = std::move(me)](auto&& error, tensorpipe_moorpc::Message msg) mutable noexcept {
me->rpc.log("%s :: %p got data\n", connectionTypeName[index<API>], (void*)&*me);
if (me->dead.load(std::memory_order_relaxed)) {
me->rpc.log("already dead!\n");
return;
}
if (error) {
me->onError(error);
} else {
if (msg.metadata.size() != 4 || msg.tensors.size() == 0) {
me->onError("Received Invalid data");
} else {
uint32_t size;
deserializeBuffer(msg.metadata.data(), 4, size);
// me->rpc.log("got %d bytes\n", size);
BufferHandle buffer = makeBuffer(size, msg.tensors.size() - 1);
bool valid = true;
if (msg.tensors[0].buffer.type != tensorpipe_moorpc::DeviceType::kCpu) {
valid = false;
}
msg.tensors[0].buffer.cpu.ptr = buffer->data();
if (msg.tensors[0].buffer.cpu.length !=
size_t((std::byte*)(buffer->tensorMetaDataOffsets() + buffer->nTensors) - buffer->data())) {
valid = false;
}
std::vector<Allocator> allocators;
allocators.reserve(msg.tensors.size() - 1);
for (size_t i = 1; i != msg.tensors.size(); ++i) {
auto& tensor = msg.tensors[i];
if (tensor.buffer.type == tensorpipe_moorpc::DeviceType::kCpu) {
allocators.emplace_back(rpc::kCPU, tensor.buffer.cpu.length);
tensor.buffer.cpu.ptr = allocators.back().data();
#ifdef USE_CUDA
} else if (tensor.buffer.type == tensorpipe_moorpc::DeviceType::kCuda) {
allocators.emplace_back(rpc::kCUDA, tensor.buffer.cpu.length);
tensor.buffer.cpu.ptr = allocators.back().data();
#endif
} else
me->onError("Received invalid tensor device type");
}
if (valid) {
API::cast(me->connection)
.read(
std::move(msg),
[allocators = std::move(allocators), buffer = std::move(buffer), me = std::move(me)](
auto&& error, [[maybe_unused]] tensorpipe_moorpc::Message msg) mutable noexcept {
if (error) {
me->onError(error);
} else {
auto* offsets = buffer->tensorMetaDataOffsets();
auto* tensors = buffer->tensors();
auto* data = buffer->data();
size_t nTensors = buffer->nTensors;
size_t len = buffer->size;
for (size_t i = 0; i != nTensors; ++i) {
Tensor& t = tensors[i].tensor;
decltype(t.scalar_type()) dtype;
decltype(t.sizes()) sizes;
decltype(t.strides()) strides;
deserializeBufferPart(
data + offsets[i], len > offsets[i] ? len - offsets[i] : 0, dtype, sizes, strides);
t = allocators[i].set(dtype, sizes, strides);
}
me->onData(std::move(buffer));
me->read(std::move(me));
}
});
} else {
me->onError("Received invalid data (2)");
}
}
}
});
}