in tensorflow_networking/seastar/seastar_client.cc [21:97]
seastar::future<> SeastarClient::Connection::Read() {
return read_buf_.read_exactly(SeastarClientTag::HEADER_SIZE)
.then([this](auto&& header) {
if (header.size() == 0) {
return seastar::make_ready_future();
}
auto tag = tag_factory_->CreateSeastarClientTag(header);
if (tag->status_ != 0) {
return read_buf_.read_exactly(tag->resp_err_msg_len_)
.then([this, tag](auto&& err_msg) {
std::string msg =
std::string(err_msg.get(), tag->resp_err_msg_len_);
if (tag->resp_err_msg_len_ == 0) {
msg = "Empty error msg.";
}
tag->RecvRespDone(
Status(static_cast<error::Code>(tag->status_), msg));
return seastar::make_ready_future();
});
}
if (tag->IsRecvTensor()) {
// handle tensor response
auto message_size = tag->GetResponseMessageSize();
auto message_buffer = tag->GetResponseMessageBuffer();
return read_buf_.read_exactly(message_size)
.then([this, tag, message_size, message_buffer](auto&& message) {
memcpy(message_buffer, message.get(), message.size());
tag->ParseMessage();
auto tensor_size = tag->GetResponseTensorSize();
auto tensor_buffer = tag->GetResponseTensorBuffer();
if (tensor_size == 0) {
tag->RecvRespDone(tensorflow::Status());
return seastar::make_ready_future();
}
return read_buf_.read_exactly(tensor_size)
.then(
[this, tag, tensor_size, tensor_buffer](auto&& tensor) {
if (tensor.size() != tensor_size) {
LOG(WARNING)
<< "Expected read size is:" << tensor_size
<< ", but real tensor size:" << tensor.size();
tag->RecvRespDone(Status(
error::UNKNOWN,
"Seastar Client: read invalid tensorbuf"));
return seastar::make_ready_future();
}
memcpy(tensor_buffer, tensor.get(), tensor.size());
tag->RecvRespDone(tensorflow::Status());
return seastar::make_ready_future();
});
});
} else {
// handle general response
auto resp_body_size = tag->GetResponseBodySize();
if (resp_body_size == 0) {
tag->RecvRespDone(tensorflow::Status());
return seastar::make_ready_future();
}
auto resp_body_buffer = tag->GetResponseBodyBuffer();
return read_buf_.read_exactly(resp_body_size)
.then([this, tag, resp_body_size, resp_body_buffer](auto&& body) {
if (body.size() != resp_body_size) {
LOG(WARNING) << "Expected read size is:" << resp_body_size
<< ", but real size is:" << body.size();
tag->RecvRespDone(tensorflow::Status(
error::UNKNOWN, "Seastar Client: read invalid msgbuf"));
return seastar::make_ready_future();
}
memcpy(resp_body_buffer, body.get(), body.size());
tag->RecvRespDone(tensorflow::Status());
return seastar::make_ready_future();
});
}
});
}