async_simple::coro::Lazy send_impl()

in include/ylt/coro_rpc/impl/coro_rpc_client.hpp [1150:1253]


  async_simple::coro::Lazy<rpc_error> send_impl(Socket &socket, uint32_t &id,
                                                std::string_view req_attachment,
                                                Args &&...args) {
    auto buffer = prepare_buffer<func>(id, std::forward<Args>(args)...);
    if (buffer.empty()) {
      co_return rpc_error{errc::message_too_large};
    }
#ifdef GENERATE_BENCHMARK_DATA
    control_->func_name_ = get_func_name<func>();
    std::ofstream file(benchmark_file_path + control_->func_name_ + ".in",
                       std::ofstream::binary | std::ofstream::out);
    file << std::string_view{(char *)buffer.data(), buffer.size()};
    file.close();
#endif
    std::pair<std::error_code, size_t> ret;
#ifdef UNIT_TEST_INJECT
    if (g_action == inject_action::client_send_bad_header) {
      buffer[0] = (std::byte)(uint8_t(buffer[0]) + 1);
    }
    if (g_action == inject_action::client_close_socket_after_send_header) {
      ret = co_await coro_io::async_write(
          socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN));
      ELOG_INFO << "client_id " << config_.client_id << " close socket";
      close();
      co_return rpc_error{errc::io_error, ret.first.message()};
    }
    else if (g_action ==
             inject_action::client_close_socket_after_send_partial_header) {
      ret = co_await coro_io::async_write(
          socket,
          asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN - 1));
      ELOG_INFO << "client_id " << config_.client_id << " close socket";
      close();
      co_return rpc_error{errc::io_error, ret.first.message()};
    }
    else if (g_action ==
             inject_action::client_shutdown_socket_after_send_header) {
      ret = co_await coro_io::async_write(
          socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN));
      ELOG_INFO << "client_id " << config_.client_id << " shutdown";
      control_->socket_.shutdown(asio::ip::tcp::socket::shutdown_send);
      co_return rpc_error{errc::io_error, ret.first.message()};
    }
    else {
#endif
      if (req_attachment.empty()) {
        while (true) {
          bool expected = false;
          if (write_mutex_.compare_exchange_weak(expected, true)) {
            break;
          }
          co_await coro_io::post(
              []() {
              },
              &control_->executor_);
        }
        ret = co_await coro_io::async_write(
            socket, asio::buffer(buffer.data(), buffer.size()));
        write_mutex_ = false;
      }
      else {
        std::array<asio::const_buffer, 2> iov{
            asio::const_buffer{buffer.data(), buffer.size()},
            asio::const_buffer{req_attachment.data(), req_attachment.size()}};
        while (true) {
          bool expected = false;
          if (write_mutex_.compare_exchange_weak(expected, true)) {
            break;
          }
          co_await coro_io::post(
              []() {
              },
              &control_->executor_);
        }
        ret = co_await coro_io::async_write(socket, iov);
        write_mutex_ = false;
      }
#ifdef UNIT_TEST_INJECT
    }
#endif
#ifdef UNIT_TEST_INJECT
    if (g_action == inject_action::force_inject_client_write_data_timeout) {
      control_->is_timeout_ = true;
    }
#endif
#ifdef UNIT_TEST_INJECT
    if (g_action == inject_action::client_close_socket_after_send_payload) {
      ELOG_INFO << "client_id " << config_.client_id
                << " client_close_socket_after_send_payload";
      close();
      co_return rpc_error{errc::io_error, ret.first.message()};
    }
#endif
    if (ret.first) {
      close();
      if (control_->is_timeout_) {
        co_return rpc_error{errc::timed_out};
      }
      else {
        co_return rpc_error{errc::io_error, ret.first.message()};
      }
    }
    co_return rpc_error{};
  }