async_simple::coro::Lazy start_impl()

in include/ylt/coro_rpc/impl/coro_connection.hpp [208:369]


  async_simple::coro::Lazy<void> start_impl(
      typename rpc_protocol::router &router, Socket &socket) noexcept {
    auto context_info = std::make_shared<context_info_t<rpc_protocol>>(
        router, shared_from_this());
    reset_timer();
    std::string magic_number;
    if (transfer_callback_ == nullptr) {
      magic_number = "SKIP";
    }
    for (int64_t req_count = 0;; req_count++) {
      typename rpc_protocol::req_header req_head_tmp;
      // timer will be reset after rpc call response
      auto ec =
          co_await rpc_protocol::read_head(socket, req_head_tmp, magic_number);
      // `co_await async_read` uses asio::async_read underlying.
      // If eof occurred, the bytes_transferred of `co_await async_read` must
      // less than RPC_HEAD_LEN. Incomplete data will be discarded.
      // So, no special handling of eof is required.
      if (ec) [[unlikely]] {
        ELOG_INFO << "connection " << conn_id_ << " close: " << ec.message();
        if (req_count == 0 && magic_number.size() && transfer_callback_) {
          ELOG_INFO << "The connection is not coro_rpc connection.";
          (*transfer_callback_)(std::move(socket_wrapper_), magic_number);
        }
        break;
      }
      // we won't transfer connection after first check magic number
      magic_number = "SKIP";

#ifdef UNIT_TEST_INJECT
      client_id_ = req_head_tmp.seq_num;
      ELOG_INFO << "conn_id " << conn_id_ << " client_id " << client_id_;
#endif

#ifdef UNIT_TEST_INJECT
      if (g_action == inject_action::close_socket_after_read_header) {
        ELOG_WARN << "inject action: close_socket_after_read_header, conn_id "
                  << conn_id_ << ", client_id " << client_id_;
        break;
      }
#endif

      // try to reuse context
      if (is_rpc_return_by_callback_) {
        // cant reuse context,make shared new one
        is_rpc_return_by_callback_ = false;
        if (context_info->status_ != context_status::finish_response) {
          // cant reuse buffer
          context_info = std::make_shared<context_info_t<rpc_protocol>>(
              router, shared_from_this());
        }
        else {
          // reuse string buffer
          context_info = std::make_shared<context_info_t<rpc_protocol>>(
              router, shared_from_this(), std::move(context_info->req_body_),
              std::move(context_info->req_attachment_));
        }
      }
      auto &req_head = context_info->req_head_;
      auto &body = context_info->req_body_;
      auto &req_attachment = context_info->req_attachment_;
      auto &key = context_info->key_;
      req_head = std::move(req_head_tmp);
      auto serialize_proto = rpc_protocol::get_serialize_protocol(req_head);

      if (!serialize_proto.has_value())
        AS_UNLIKELY {
          ELOG_ERROR << "bad serialize protocol type, conn_id " << conn_id_;
          break;
        }

      std::string_view payload;
      // rpc_protocol::buffer_type maybe from user, default from framework.

      ec = co_await rpc_protocol::read_payload(socket, req_head, body,
                                               req_attachment);
      cancel_timer();
      payload = std::string_view{body};

      if (ec)
        AS_UNLIKELY {
          ELOG_ERROR << "read error: " << ec.message() << ", conn_id "
                     << conn_id_;
          break;
        }

      key = rpc_protocol::get_route_key(req_head);
      auto handler = router.get_handler(key);
      ++rpc_processing_cnt_;
      if (!handler) {
        auto coro_handler = router.get_coro_handler(key);
        set_rpc_return_by_callback();
        router.route_coro(coro_handler, payload, serialize_proto.value(), key)
            .template setLazyLocal<connection_lazy_ctx<rpc_protocol>>(
                context_info)
            .directlyStart(
                [context_info](auto &&result) mutable {
                  std::pair<coro_rpc::err_code, std::string> &ret =
                      result.value();
                  if (ret.first)
                    AS_UNLIKELY {
                      ELOGI << "rpc error in function:"
                            << context_info->get_rpc_function_name()
                            << ". error code:" << ret.first.ec
                            << ". message : " << ret.second;
                    }
                  auto executor = context_info->conn_->get_executor();
                  executor->schedule([context_info = std::move(context_info),
                                      ret = std::move(ret)]() mutable {
                    context_info->conn_
                        ->template direct_response_msg<rpc_protocol>(
                            ret.first, ret.second, context_info->req_head_,
                            std::move(context_info->resp_attachment_),
                            std::move(context_info->complete_handler_));
                  });
                },
                socket_wrapper_.get_executor());
      }
      else {
        coro_rpc::detail::set_context<rpc_protocol>() = context_info.get();
        auto &&[resp_err, resp_buf] = router.route(
            handler, payload, context_info, serialize_proto.value(), key);
        if (is_rpc_return_by_callback_) {
          if (!resp_err) {
            continue;
          }
          else {
            ELOGI << "rpc error in function:"
                  << context_info->get_rpc_function_name()
                  << ". error code:" << resp_err.ec
                  << ". message : " << resp_buf;
            is_rpc_return_by_callback_ = false;
          }
        }
#ifdef UNIT_TEST_INJECT
        if (g_action == inject_action::close_socket_after_send_length) {
          ELOG_WARN
              << "inject action: close_socket_after_send_length , conn_id "
              << conn_id_ << ", client_id " << client_id_;
          std::string header_buf = rpc_protocol::prepare_response(
              resp_buf, req_head, 0, resp_err, "");
          co_await coro_io::async_write(socket, asio::buffer(header_buf));
          break;
        }
        if (g_action == inject_action::server_send_bad_rpc_result) {
          ELOG_WARN << "inject action: server_send_bad_rpc_result , conn_id "
                    << conn_id_ << ", client_id " << client_id_;
          resp_buf[0] = resp_buf[0] + 1;
        }
#endif
        direct_response_msg<rpc_protocol>(
            resp_err, resp_buf, req_head,
            std::move(context_info->resp_attachment_),
            std::move(context_info->complete_handler_));
        context_info->resp_attachment_ = [] {
          return std::string_view{};
        };
      }
    }
    close();
    cancel_timer();
  }