in include/ylt/coro_rpc/impl/coro_rpc_client.hpp [1150:1253]
async_simple::coro::Lazy<rpc_error> send_impl(Socket &socket, uint32_t &id,
std::string_view req_attachment,
Args &&...args) {
auto buffer = prepare_buffer<func>(id, std::forward<Args>(args)...);
if (buffer.empty()) {
co_return rpc_error{errc::message_too_large};
}
#ifdef GENERATE_BENCHMARK_DATA
control_->func_name_ = get_func_name<func>();
std::ofstream file(benchmark_file_path + control_->func_name_ + ".in",
std::ofstream::binary | std::ofstream::out);
file << std::string_view{(char *)buffer.data(), buffer.size()};
file.close();
#endif
std::pair<std::error_code, size_t> ret;
#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::client_send_bad_header) {
buffer[0] = (std::byte)(uint8_t(buffer[0]) + 1);
}
if (g_action == inject_action::client_close_socket_after_send_header) {
ret = co_await coro_io::async_write(
socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN));
ELOG_INFO << "client_id " << config_.client_id << " close socket";
close();
co_return rpc_error{errc::io_error, ret.first.message()};
}
else if (g_action ==
inject_action::client_close_socket_after_send_partial_header) {
ret = co_await coro_io::async_write(
socket,
asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN - 1));
ELOG_INFO << "client_id " << config_.client_id << " close socket";
close();
co_return rpc_error{errc::io_error, ret.first.message()};
}
else if (g_action ==
inject_action::client_shutdown_socket_after_send_header) {
ret = co_await coro_io::async_write(
socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN));
ELOG_INFO << "client_id " << config_.client_id << " shutdown";
control_->socket_.shutdown(asio::ip::tcp::socket::shutdown_send);
co_return rpc_error{errc::io_error, ret.first.message()};
}
else {
#endif
if (req_attachment.empty()) {
while (true) {
bool expected = false;
if (write_mutex_.compare_exchange_weak(expected, true)) {
break;
}
co_await coro_io::post(
[]() {
},
&control_->executor_);
}
ret = co_await coro_io::async_write(
socket, asio::buffer(buffer.data(), buffer.size()));
write_mutex_ = false;
}
else {
std::array<asio::const_buffer, 2> iov{
asio::const_buffer{buffer.data(), buffer.size()},
asio::const_buffer{req_attachment.data(), req_attachment.size()}};
while (true) {
bool expected = false;
if (write_mutex_.compare_exchange_weak(expected, true)) {
break;
}
co_await coro_io::post(
[]() {
},
&control_->executor_);
}
ret = co_await coro_io::async_write(socket, iov);
write_mutex_ = false;
}
#ifdef UNIT_TEST_INJECT
}
#endif
#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::force_inject_client_write_data_timeout) {
control_->is_timeout_ = true;
}
#endif
#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::client_close_socket_after_send_payload) {
ELOG_INFO << "client_id " << config_.client_id
<< " client_close_socket_after_send_payload";
close();
co_return rpc_error{errc::io_error, ret.first.message()};
}
#endif
if (ret.first) {
close();
if (control_->is_timeout_) {
co_return rpc_error{errc::timed_out};
}
else {
co_return rpc_error{errc::io_error, ret.first.message()};
}
}
co_return rpc_error{};
}