in include/ylt/coro_rpc/impl/coro_connection.hpp [208:369]
async_simple::coro::Lazy<void> start_impl(
typename rpc_protocol::router &router, Socket &socket) noexcept {
auto context_info = std::make_shared<context_info_t<rpc_protocol>>(
router, shared_from_this());
reset_timer();
std::string magic_number;
if (transfer_callback_ == nullptr) {
magic_number = "SKIP";
}
for (int64_t req_count = 0;; req_count++) {
typename rpc_protocol::req_header req_head_tmp;
// timer will be reset after rpc call response
auto ec =
co_await rpc_protocol::read_head(socket, req_head_tmp, magic_number);
// `co_await async_read` uses asio::async_read underlying.
// If eof occurred, the bytes_transferred of `co_await async_read` must
// less than RPC_HEAD_LEN. Incomplete data will be discarded.
// So, no special handling of eof is required.
if (ec) [[unlikely]] {
ELOG_INFO << "connection " << conn_id_ << " close: " << ec.message();
if (req_count == 0 && magic_number.size() && transfer_callback_) {
ELOG_INFO << "The connection is not coro_rpc connection.";
(*transfer_callback_)(std::move(socket_wrapper_), magic_number);
}
break;
}
// we won't transfer connection after first check magic number
magic_number = "SKIP";
#ifdef UNIT_TEST_INJECT
client_id_ = req_head_tmp.seq_num;
ELOG_INFO << "conn_id " << conn_id_ << " client_id " << client_id_;
#endif
#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::close_socket_after_read_header) {
ELOG_WARN << "inject action: close_socket_after_read_header, conn_id "
<< conn_id_ << ", client_id " << client_id_;
break;
}
#endif
// try to reuse context
if (is_rpc_return_by_callback_) {
// cant reuse context,make shared new one
is_rpc_return_by_callback_ = false;
if (context_info->status_ != context_status::finish_response) {
// cant reuse buffer
context_info = std::make_shared<context_info_t<rpc_protocol>>(
router, shared_from_this());
}
else {
// reuse string buffer
context_info = std::make_shared<context_info_t<rpc_protocol>>(
router, shared_from_this(), std::move(context_info->req_body_),
std::move(context_info->req_attachment_));
}
}
auto &req_head = context_info->req_head_;
auto &body = context_info->req_body_;
auto &req_attachment = context_info->req_attachment_;
auto &key = context_info->key_;
req_head = std::move(req_head_tmp);
auto serialize_proto = rpc_protocol::get_serialize_protocol(req_head);
if (!serialize_proto.has_value())
AS_UNLIKELY {
ELOG_ERROR << "bad serialize protocol type, conn_id " << conn_id_;
break;
}
std::string_view payload;
// rpc_protocol::buffer_type maybe from user, default from framework.
ec = co_await rpc_protocol::read_payload(socket, req_head, body,
req_attachment);
cancel_timer();
payload = std::string_view{body};
if (ec)
AS_UNLIKELY {
ELOG_ERROR << "read error: " << ec.message() << ", conn_id "
<< conn_id_;
break;
}
key = rpc_protocol::get_route_key(req_head);
auto handler = router.get_handler(key);
++rpc_processing_cnt_;
if (!handler) {
auto coro_handler = router.get_coro_handler(key);
set_rpc_return_by_callback();
router.route_coro(coro_handler, payload, serialize_proto.value(), key)
.template setLazyLocal<connection_lazy_ctx<rpc_protocol>>(
context_info)
.directlyStart(
[context_info](auto &&result) mutable {
std::pair<coro_rpc::err_code, std::string> &ret =
result.value();
if (ret.first)
AS_UNLIKELY {
ELOGI << "rpc error in function:"
<< context_info->get_rpc_function_name()
<< ". error code:" << ret.first.ec
<< ". message : " << ret.second;
}
auto executor = context_info->conn_->get_executor();
executor->schedule([context_info = std::move(context_info),
ret = std::move(ret)]() mutable {
context_info->conn_
->template direct_response_msg<rpc_protocol>(
ret.first, ret.second, context_info->req_head_,
std::move(context_info->resp_attachment_),
std::move(context_info->complete_handler_));
});
},
socket_wrapper_.get_executor());
}
else {
coro_rpc::detail::set_context<rpc_protocol>() = context_info.get();
auto &&[resp_err, resp_buf] = router.route(
handler, payload, context_info, serialize_proto.value(), key);
if (is_rpc_return_by_callback_) {
if (!resp_err) {
continue;
}
else {
ELOGI << "rpc error in function:"
<< context_info->get_rpc_function_name()
<< ". error code:" << resp_err.ec
<< ". message : " << resp_buf;
is_rpc_return_by_callback_ = false;
}
}
#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::close_socket_after_send_length) {
ELOG_WARN
<< "inject action: close_socket_after_send_length , conn_id "
<< conn_id_ << ", client_id " << client_id_;
std::string header_buf = rpc_protocol::prepare_response(
resp_buf, req_head, 0, resp_err, "");
co_await coro_io::async_write(socket, asio::buffer(header_buf));
break;
}
if (g_action == inject_action::server_send_bad_rpc_result) {
ELOG_WARN << "inject action: server_send_bad_rpc_result , conn_id "
<< conn_id_ << ", client_id " << client_id_;
resp_buf[0] = resp_buf[0] + 1;
}
#endif
direct_response_msg<rpc_protocol>(
resp_err, resp_buf, req_head,
std::move(context_info->resp_attachment_),
std::move(context_info->complete_handler_));
context_info->resp_attachment_ = [] {
return std::string_view{};
};
}
}
close();
cancel_timer();
}