async_simple::coro::Lazy start()

in include/ylt/standalone/cinatra/coro_http_connection.hpp [104:416]


  async_simple::coro::Lazy<void> start(bool has_shake = false) {
    std::chrono::system_clock::time_point start{};
    std::chrono::system_clock::time_point mid{};
    while (true) {
#ifdef CINATRA_ENABLE_SSL
      if (socket_wrapper_.use_ssl() && !has_shake) {
        auto ec = co_await coro_io::async_handshake(
            socket_wrapper_.ssl_stream(), asio::ssl::stream_base::server);
        if (ec) {
          CINATRA_LOG_ERROR << "handle_shake error: " << ec.message();
          close();
          break;
        }

        has_shake = true;
      }
#endif
      auto [ec, size] = co_await async_read_until(head_buf_, TWO_CRCF);
      if (ec) {
        if (ec != asio::error::eof) {
          CINATRA_LOG_WARNING << "read http header error: " << ec.message();
        }

        close();
        break;
      }

      const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
      int head_len = parser_.parse_request(data_ptr, size, 0);
      if (head_len <= 0) {
        CINATRA_LOG_ERROR << "parse http header error";
        response_.set_status_and_content(status_type::bad_request,
                                         "invalid http protocol");
        co_await reply();
        close();
        break;
      }

      if (parser_.body_len() > max_http_body_len_ || parser_.body_len() < 0)
          [[unlikely]] {
        CINATRA_LOG_ERROR << "invalid http content length: "
                          << parser_.body_len();
        response_.set_status_and_content(status_type::bad_request,
                                         "invalid http content length");
        co_await reply();
        close();
        break;
      }

      head_buf_.consume(size);
      keep_alive_ = check_keep_alive();

      auto type = request_.get_content_type();

      if (type != content_type::chunked && type != content_type::multipart) {
        size_t body_len = (size_t)parser_.body_len();
        if (body_len == 0) {
          if (parser_.method() == "GET"sv) {
            if (request_.is_upgrade()) {
#ifdef CINATRA_ENABLE_GZIP
              if (request_.is_support_compressed()) {
                is_client_ws_compressed_ = true;
              }
              else {
                is_client_ws_compressed_ = false;
              }
#endif
              // websocket
              build_ws_handshake_head();
              bool ok = co_await reply(true);  // response ws handshake
              if (!ok) {
                close();
                break;
              }
              response_.set_delay(true);
            }
          }
        }
        else if (body_len <= head_buf_.size()) {
          if (body_len > 0) {
            detail::resize(body_, body_len);
            auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
            memcpy(body_.data(), data_ptr, body_len);
            head_buf_.consume(head_buf_.size());
          }
        }
        else {
          size_t part_size = head_buf_.size();
          size_t size_to_read = body_len - part_size;
          auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
          detail::resize(body_, body_len);
          memcpy(body_.data(), data_ptr, part_size);
          head_buf_.consume(part_size);

          auto [ec, size] = co_await async_read(
              asio::buffer(body_.data() + part_size, size_to_read),
              size_to_read);
          if (ec) {
            CINATRA_LOG_ERROR << "async_read error: " << ec.message();
            close();
            break;
          }
        }
      }

      std::string_view key = {
          parser_.method().data(),
          parser_.method().length() + 1 + parser_.url().length()};

      std::string decode_key;
      if (parser_.url().find('%') != std::string_view::npos) {
        decode_key = code_utils::url_decode(key);
        key = decode_key;
      }

      if (!body_.empty()) {
        request_.set_body(body_);
      }

      if (auto handler = router_.get_handler(key); handler) {
        router_.route(handler, request_, response_, key);
      }
      else {
        if (auto coro_handler = router_.get_coro_handler(key); coro_handler) {
          co_await router_.route_coro(coro_handler, request_, response_, key);
        }
        else {
          bool is_exist = false;
          bool is_coro_exist = false;
          bool is_matched_regex_router = false;
          std::function<void(coro_http_request & req,
                             coro_http_response & resp)>
              handler;
          std::string method_str{parser_.method()};
          std::string url_path = method_str;
          url_path.append(" ").append(parser_.url());
          std::tie(is_exist, handler, request_.params_) =
              router_.get_router_tree()->get(url_path, method_str);
          if (is_exist) {
            if (handler) {
              (handler)(request_, response_);
            }
            else {
              response_.set_status(status_type::not_found);
            }
          }
          else {
            std::function<async_simple::coro::Lazy<void>(
                coro_http_request & req, coro_http_response & resp)>
                coro_handler;

            std::tie(is_coro_exist, coro_handler, request_.params_) =
                router_.get_coro_router_tree()->get_coro(url_path, method_str);

            if (is_coro_exist) {
              if (coro_handler) {
                co_await coro_handler(request_, response_);
              }
              else {
                response_.set_status(status_type::not_found);
              }
            }
            else {
              // coro regex router
              auto coro_regex_handlers = router_.get_coro_regex_handlers();
              if (coro_regex_handlers.size() != 0) {
                for (auto &pair : coro_regex_handlers) {
                  std::string coro_regex_key{key};

                  if (std::regex_match(coro_regex_key, request_.matches_,
                                       std::get<0>(pair))) {
                    auto coro_handler = std::get<1>(pair);
                    if (coro_handler) {
                      co_await coro_handler(request_, response_);
                      is_matched_regex_router = true;
                    }
                  }
                }
              }
              // regex router
              if (!is_matched_regex_router) {
                auto regex_handlers = router_.get_regex_handlers();
                if (regex_handlers.size() != 0) {
                  for (auto &pair : regex_handlers) {
                    std::string regex_key{key};
                    if (std::regex_match(regex_key, request_.matches_,
                                         std::get<0>(pair))) {
                      auto handler = std::get<1>(pair);
                      if (handler) {
                        (handler)(request_, response_);
                        is_matched_regex_router = true;
                      }
                    }
                  }
                }
              }
              // radix route -> radix coro route -> regex coro -> regex ->
              // default -> not found
              if (!is_matched_regex_router) {
                if (default_handler_) {
                  co_await default_handler_(request_, response_);
                }
                else {
                  // not found
                  response_.set_status(status_type::not_found);
                }
              }
            }
          }
        }
      }

      if (!response_.get_delay()) {
        if (head_buf_.size()) {
          if (type == content_type::multipart ||
              type == content_type::chunked) {
            if (response_.content().empty())
              response_.set_status_and_content(
                  status_type::not_implemented,
                  "mutipart handler not implemented or incorrect implemented");
            co_await reply();
            close();
            CINATRA_LOG_ERROR
                << "mutipart handler not implemented or incorrect implemented"
                << ec.message();
            break;
          }
          else if (parser_.method()[0] != 'G' && parser_.method()[0] != 'H') {
            // handle pipeling, only support GET and HEAD method now.
            response_.set_status_and_content(status_type::method_not_allowed,
                                             "method not allowed");
            co_await reply();
          }
          else {
            resp_str_.reserve(512);
            response_.build_resp_str(resp_str_);

            while (true) {
              size_t left_size = head_buf_.size();
              auto next_data_ptr =
                  asio::buffer_cast<const char *>(head_buf_.data());
              std::string_view left_content{next_data_ptr, left_size};
              size_t pos = left_content.find(TWO_CRCF);
              if (pos == std::string_view::npos) {
                break;
              }
              http_parser parser;
              int head_len = parser.parse_request(next_data_ptr, left_size, 0);
              if (head_len <= 0) {
                CINATRA_LOG_ERROR << "parse http header error";
                response_.set_status_and_content(status_type::bad_request,
                                                 "invalid http protocol");
                co_await reply();
                close();
                break;
              }

              head_buf_.consume(pos + TWO_CRCF.length());

              std::string_view next_key = {
                  parser.method().data(),
                  parser.method().length() + 1 + parser.url().length()};

              coro_http_request req(parser, this);
              coro_http_response resp(this);
              resp.need_date_head(response_.need_date());
              if (auto handler = router_.get_handler(next_key); handler) {
                router_.route(handler, req, resp, key);
              }
              else {
                if (auto coro_handler = router_.get_coro_handler(next_key);
                    coro_handler) {
                  co_await router_.route_coro(coro_handler, req, resp, key);
                }
                else {
                  resp.set_status(status_type::not_found);
                }
              }

              resp.build_resp_str(resp_str_);
            }

            auto [write_ec, _] = co_await async_write(asio::buffer(resp_str_));
            if (write_ec) {
              CINATRA_LOG_ERROR << "async_write error: " << write_ec.message();
              close();
              co_return;
            }
          }
          head_buf_.consume(head_buf_.size());
        }
        else {
          handle_session_for_response();
          co_await reply();
        }
      }

      if (!keep_alive_) {
        // now in io thread, so can close socket immediately.
        close();
      }

      response_.clear();
      request_.clear();
      buffers_.clear();
      body_.clear();
      resp_str_.clear();
      multi_buf_ = true;
      if (need_shrink_every_time_) {
        body_.shrink_to_fit();
      }
    }
  }