seastar::future SeastarClient::Connection::Read()

in tensorflow_networking/seastar/seastar_client.cc [21:97]


seastar::future<> SeastarClient::Connection::Read() {
  return read_buf_.read_exactly(SeastarClientTag::HEADER_SIZE)
      .then([this](auto&& header) {
        if (header.size() == 0) {
          return seastar::make_ready_future();
        }
        auto tag = tag_factory_->CreateSeastarClientTag(header);
        if (tag->status_ != 0) {
          return read_buf_.read_exactly(tag->resp_err_msg_len_)
              .then([this, tag](auto&& err_msg) {
                std::string msg =
                    std::string(err_msg.get(), tag->resp_err_msg_len_);
                if (tag->resp_err_msg_len_ == 0) {
                  msg = "Empty error msg.";
                }
                tag->RecvRespDone(
                    Status(static_cast<error::Code>(tag->status_), msg));
                return seastar::make_ready_future();
              });
        }

        if (tag->IsRecvTensor()) {
          // handle tensor response
          auto message_size = tag->GetResponseMessageSize();
          auto message_buffer = tag->GetResponseMessageBuffer();
          return read_buf_.read_exactly(message_size)
              .then([this, tag, message_size, message_buffer](auto&& message) {
                memcpy(message_buffer, message.get(), message.size());
                tag->ParseMessage();
                auto tensor_size = tag->GetResponseTensorSize();
                auto tensor_buffer = tag->GetResponseTensorBuffer();
                if (tensor_size == 0) {
                  tag->RecvRespDone(tensorflow::Status());
                  return seastar::make_ready_future();
                }
                return read_buf_.read_exactly(tensor_size)
                    .then(
                        [this, tag, tensor_size, tensor_buffer](auto&& tensor) {
                          if (tensor.size() != tensor_size) {
                            LOG(WARNING)
                                << "Expected read size is:" << tensor_size
                                << ", but real tensor size:" << tensor.size();
                            tag->RecvRespDone(Status(
                                error::UNKNOWN,
                                "Seastar Client: read invalid tensorbuf"));
                            return seastar::make_ready_future();
                          }
                          memcpy(tensor_buffer, tensor.get(), tensor.size());
                          tag->RecvRespDone(tensorflow::Status());
                          return seastar::make_ready_future();
                        });
              });
        } else {
          // handle general response
          auto resp_body_size = tag->GetResponseBodySize();
          if (resp_body_size == 0) {
            tag->RecvRespDone(tensorflow::Status());
            return seastar::make_ready_future();
          }

          auto resp_body_buffer = tag->GetResponseBodyBuffer();
          return read_buf_.read_exactly(resp_body_size)
              .then([this, tag, resp_body_size, resp_body_buffer](auto&& body) {
                if (body.size() != resp_body_size) {
                  LOG(WARNING) << "Expected read size is:" << resp_body_size
                               << ", but real size is:" << body.size();
                  tag->RecvRespDone(tensorflow::Status(
                      error::UNKNOWN, "Seastar Client: read invalid msgbuf"));
                  return seastar::make_ready_future();
                }
                memcpy(resp_body_buffer, body.get(), body.size());
                tag->RecvRespDone(tensorflow::Status());
                return seastar::make_ready_future();
              });
        }
      });
}