include/ylt/standalone/cinatra/coro_http_connection.hpp (898 lines of code) (raw):

#pragma once #include <async_simple/Executor.h> #include <async_simple/coro/SyncAwait.h> #include <asio/buffer.hpp> #include <system_error> #include <thread> #include "asio/dispatch.hpp" #include "asio/streambuf.hpp" #include "async_simple/coro/Lazy.h" #include "cinatra/cinatra_log_wrapper.hpp" #include "cinatra/response_cv.hpp" #include "cookie.hpp" #include "coro_http_request.hpp" #include "coro_http_router.hpp" #include "define.h" #include "http_parser.hpp" #include "multipart.hpp" #include "session_manager.hpp" #include "sha1.hpp" #include "string_resize.hpp" #include "websocket.hpp" #ifdef CINATRA_ENABLE_GZIP #include "gzip.hpp" #endif #include "ylt/coro_io/coro_file.hpp" #include "ylt/coro_io/coro_io.hpp" namespace cinatra { struct websocket_result { std::error_code ec; ws_frame_type type; std::string_view data; bool eof; }; class coro_http_connection : public std::enable_shared_from_this<coro_http_connection> { public: template <typename executor_t> coro_http_connection(executor_t *executor, coro_io::socket_wrapper_t &&socket_wrapper, coro_http_router &router) : executor_(executor), socket_wrapper_(std::move(socket_wrapper)), router_(router), request_(parser_, this), response_(this) { buffers_.reserve(3); } ~coro_http_connection() { close(); } #ifdef CINATRA_ENABLE_SSL bool init_ssl( std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>> ssl_stream) { if (ssl_stream) { socket_wrapper_.ssl_stream() = std::move(ssl_stream); return true; } return false; } bool init_ssl(const std::string &cert_file, const std::string &key_file, std::string passwd) { unsigned long ssl_options = asio::ssl::context::default_workarounds | asio::ssl::context::no_sslv2 | asio::ssl::context::single_dh_use; try { ssl_ctx_ = std::make_unique<asio::ssl::context>(asio::ssl::context::sslv23); ssl_ctx_->set_options(ssl_options); if (!passwd.empty()) { ssl_ctx_->set_password_callback([pwd = std::move(passwd)](auto, auto) { return pwd; }); } std::error_code ec; if (fs::exists(cert_file, ec)) { ssl_ctx_->use_certificate_chain_file(std::move(cert_file)); } if (fs::exists(key_file, ec)) { ssl_ctx_->use_private_key_file(std::move(key_file), asio::ssl::context::pem); } socket_wrapper_.ssl_stream() = std::make_unique<asio::ssl::stream<asio::ip::tcp::socket &>>( *socket_wrapper_.socket(), *ssl_ctx_); } catch (const std::exception &e) { CINATRA_LOG_ERROR << "init ssl failed, reason: " << e.what(); return false; } return true; } #endif void add_head(std::string_view msg) { head_buf_.sputn(msg.data(), msg.size()); } async_simple::coro::Lazy<void> start(bool has_shake = false) { std::chrono::system_clock::time_point start{}; std::chrono::system_clock::time_point mid{}; while (true) { #ifdef CINATRA_ENABLE_SSL if (socket_wrapper_.use_ssl() && !has_shake) { auto ec = co_await coro_io::async_handshake( socket_wrapper_.ssl_stream(), asio::ssl::stream_base::server); if (ec) { CINATRA_LOG_ERROR << "handle_shake error: " << ec.message(); close(); break; } has_shake = true; } #endif auto [ec, size] = co_await async_read_until(head_buf_, TWO_CRCF); if (ec) { if (ec != asio::error::eof) { CINATRA_LOG_WARNING << "read http header error: " << ec.message(); } close(); break; } const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data()); int head_len = parser_.parse_request(data_ptr, size, 0); if (head_len <= 0) { CINATRA_LOG_ERROR << "parse http header error"; response_.set_status_and_content(status_type::bad_request, "invalid http protocol"); co_await reply(); close(); break; } if (parser_.body_len() > max_http_body_len_ || parser_.body_len() < 0) [[unlikely]] { CINATRA_LOG_ERROR << "invalid http content length: " << parser_.body_len(); response_.set_status_and_content(status_type::bad_request, "invalid http content length"); co_await reply(); close(); break; } head_buf_.consume(size); keep_alive_ = check_keep_alive(); auto type = request_.get_content_type(); if (type != content_type::chunked && type != content_type::multipart) { size_t body_len = (size_t)parser_.body_len(); if (body_len == 0) { if (parser_.method() == "GET"sv) { if (request_.is_upgrade()) { #ifdef CINATRA_ENABLE_GZIP if (request_.is_support_compressed()) { is_client_ws_compressed_ = true; } else { is_client_ws_compressed_ = false; } #endif // websocket build_ws_handshake_head(); bool ok = co_await reply(true); // response ws handshake if (!ok) { close(); break; } response_.set_delay(true); } } } else if (body_len <= head_buf_.size()) { if (body_len > 0) { detail::resize(body_, body_len); auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data()); memcpy(body_.data(), data_ptr, body_len); head_buf_.consume(head_buf_.size()); } } else { size_t part_size = head_buf_.size(); size_t size_to_read = body_len - part_size; auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data()); detail::resize(body_, body_len); memcpy(body_.data(), data_ptr, part_size); head_buf_.consume(part_size); auto [ec, size] = co_await async_read( asio::buffer(body_.data() + part_size, size_to_read), size_to_read); if (ec) { CINATRA_LOG_ERROR << "async_read error: " << ec.message(); close(); break; } } } std::string_view key = { parser_.method().data(), parser_.method().length() + 1 + parser_.url().length()}; std::string decode_key; if (parser_.url().find('%') != std::string_view::npos) { decode_key = code_utils::url_decode(key); key = decode_key; } if (!body_.empty()) { request_.set_body(body_); } if (auto handler = router_.get_handler(key); handler) { router_.route(handler, request_, response_, key); } else { if (auto coro_handler = router_.get_coro_handler(key); coro_handler) { co_await router_.route_coro(coro_handler, request_, response_, key); } else { bool is_exist = false; bool is_coro_exist = false; bool is_matched_regex_router = false; std::function<void(coro_http_request & req, coro_http_response & resp)> handler; std::string method_str{parser_.method()}; std::string url_path = method_str; url_path.append(" ").append(parser_.url()); std::tie(is_exist, handler, request_.params_) = router_.get_router_tree()->get(url_path, method_str); if (is_exist) { if (handler) { (handler)(request_, response_); } else { response_.set_status(status_type::not_found); } } else { std::function<async_simple::coro::Lazy<void>( coro_http_request & req, coro_http_response & resp)> coro_handler; std::tie(is_coro_exist, coro_handler, request_.params_) = router_.get_coro_router_tree()->get_coro(url_path, method_str); if (is_coro_exist) { if (coro_handler) { co_await coro_handler(request_, response_); } else { response_.set_status(status_type::not_found); } } else { // coro regex router auto coro_regex_handlers = router_.get_coro_regex_handlers(); if (coro_regex_handlers.size() != 0) { for (auto &pair : coro_regex_handlers) { std::string coro_regex_key{key}; if (std::regex_match(coro_regex_key, request_.matches_, std::get<0>(pair))) { auto coro_handler = std::get<1>(pair); if (coro_handler) { co_await coro_handler(request_, response_); is_matched_regex_router = true; } } } } // regex router if (!is_matched_regex_router) { auto regex_handlers = router_.get_regex_handlers(); if (regex_handlers.size() != 0) { for (auto &pair : regex_handlers) { std::string regex_key{key}; if (std::regex_match(regex_key, request_.matches_, std::get<0>(pair))) { auto handler = std::get<1>(pair); if (handler) { (handler)(request_, response_); is_matched_regex_router = true; } } } } } // radix route -> radix coro route -> regex coro -> regex -> // default -> not found if (!is_matched_regex_router) { if (default_handler_) { co_await default_handler_(request_, response_); } else { // not found response_.set_status(status_type::not_found); } } } } } } if (!response_.get_delay()) { if (head_buf_.size()) { if (type == content_type::multipart || type == content_type::chunked) { if (response_.content().empty()) response_.set_status_and_content( status_type::not_implemented, "mutipart handler not implemented or incorrect implemented"); co_await reply(); close(); CINATRA_LOG_ERROR << "mutipart handler not implemented or incorrect implemented" << ec.message(); break; } else if (parser_.method()[0] != 'G' && parser_.method()[0] != 'H') { // handle pipeling, only support GET and HEAD method now. response_.set_status_and_content(status_type::method_not_allowed, "method not allowed"); co_await reply(); } else { resp_str_.reserve(512); response_.build_resp_str(resp_str_); while (true) { size_t left_size = head_buf_.size(); auto next_data_ptr = asio::buffer_cast<const char *>(head_buf_.data()); std::string_view left_content{next_data_ptr, left_size}; size_t pos = left_content.find(TWO_CRCF); if (pos == std::string_view::npos) { break; } http_parser parser; int head_len = parser.parse_request(next_data_ptr, left_size, 0); if (head_len <= 0) { CINATRA_LOG_ERROR << "parse http header error"; response_.set_status_and_content(status_type::bad_request, "invalid http protocol"); co_await reply(); close(); break; } head_buf_.consume(pos + TWO_CRCF.length()); std::string_view next_key = { parser.method().data(), parser.method().length() + 1 + parser.url().length()}; coro_http_request req(parser, this); coro_http_response resp(this); resp.need_date_head(response_.need_date()); if (auto handler = router_.get_handler(next_key); handler) { router_.route(handler, req, resp, key); } else { if (auto coro_handler = router_.get_coro_handler(next_key); coro_handler) { co_await router_.route_coro(coro_handler, req, resp, key); } else { resp.set_status(status_type::not_found); } } resp.build_resp_str(resp_str_); } auto [write_ec, _] = co_await async_write(asio::buffer(resp_str_)); if (write_ec) { CINATRA_LOG_ERROR << "async_write error: " << write_ec.message(); close(); co_return; } } head_buf_.consume(head_buf_.size()); } else { handle_session_for_response(); co_await reply(); } } if (!keep_alive_) { // now in io thread, so can close socket immediately. close(); } response_.clear(); request_.clear(); buffers_.clear(); body_.clear(); resp_str_.clear(); multi_buf_ = true; if (need_shrink_every_time_) { body_.shrink_to_fit(); } } } async_simple::coro::Lazy<bool> reply(bool need_to_bufffer = true) { std::error_code ec; size_t size; if (multi_buf_) { if (need_to_bufffer) { response_.to_buffers(buffers_, chunk_size_str_); } std::tie(ec, size) = co_await async_write(buffers_); } else { if (need_to_bufffer) { response_.build_resp_str(resp_str_); } std::tie(ec, size) = co_await async_write(asio::buffer(resp_str_)); } if (ec) { CINATRA_LOG_ERROR << "async_write error: " << ec.message(); close(); co_return false; } co_return true; } std::string local_address() { std::string addr; set_address_impl(addr, false); return addr; } std::string remote_address() { if (!remote_addr_.empty()) { return remote_addr_; } set_address_impl(remote_addr_); return remote_addr_; } size_t available() { std::error_code ec{}; return socket_wrapper_.socket()->available(ec); } void set_multi_buf(bool r) { multi_buf_ = r; } void set_default_handler( std::function<async_simple::coro::Lazy<void>( coro_http_request &, coro_http_response &)> &handler) { default_handler_ = handler; } void set_max_http_body_size(int64_t max_size) { max_http_body_len_ = max_size; } #ifdef INJECT_FOR_HTTP_SEVER_TEST void set_write_failed_forever(bool r) { write_failed_forever_ = r; } void set_read_failed_forever(bool r) { read_failed_forever_ = r; } #endif async_simple::coro::Lazy<bool> write_data(std::string_view message) { std::vector<asio::const_buffer> buffers; buffers.push_back(asio::buffer(message)); auto [ec, _] = co_await async_write(buffers); if (ec) { CINATRA_LOG_ERROR << "async_write error: " << ec.message(); close(); co_return false; } co_return true; } async_simple::coro::Lazy<bool> begin_chunked() { response_.set_delay(true); response_.set_status(status_type::ok); co_return co_await reply(); } async_simple::coro::Lazy<bool> write_chunked(std::string_view chunked_data, bool eof = false) { response_.set_delay(true); buffers_.clear(); to_chunked_buffers(buffers_, chunk_size_str_, chunked_data, eof); co_return co_await reply(false); } async_simple::coro::Lazy<bool> end_chunked() { co_return co_await write_chunked("", true); } async_simple::coro::Lazy<bool> begin_multipart( std::string_view boundary = "", std::string_view content_type = "") { response_.set_delay(true); response_.set_status(status_type::ok); if (boundary.empty()) { boundary = BOUNDARY; } if (content_type.empty()) { content_type = "multipart/form-data"; } std::string str{content_type}; str.append("; ").append("boundary=").append(boundary); response_.add_header("Content-Type", str); response_.set_boundary(boundary); co_return co_await reply(); } async_simple::coro::Lazy<bool> write_multipart( std::string_view part_data, std::string_view content_type) { response_.set_delay(true); buffers_.clear(); std::string part_head = "--"; part_head.append(response_.get_boundary()).append(CRCF); part_head.append("Content-Type: ").append(content_type).append(CRCF); part_head.append("Content-Length: ") .append(std::to_string(part_data.size())) .append(TWO_CRCF); buffers_.push_back(asio::buffer(part_head)); buffers_.push_back(asio::buffer(part_data)); buffers_.push_back(asio::buffer(CRCF)); auto [ec, _] = co_await async_write(buffers_); co_return !ec; } async_simple::coro::Lazy<bool> end_multipart() { response_.set_delay(true); buffers_.clear(); std::string multipart_end = "--"; multipart_end.append(response_.get_boundary()).append("--").append(CRCF); auto [ec, _] = co_await async_write(asio::buffer(multipart_end)); co_return !ec; } async_simple::coro::Lazy<chunked_result> read_chunked() { if (head_buf_.size() > 0) { const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data()); chunked_buf_.sputn(data_ptr, head_buf_.size()); head_buf_.consume(head_buf_.size()); } chunked_result result{}; std::error_code ec{}; size_t size = 0; if (std::tie(ec, size) = co_await async_read_until(chunked_buf_, CRCF); ec) { result.ec = ec; close(); co_return result; } size_t buf_size = chunked_buf_.size(); size_t additional_size = buf_size - size; const char *data_ptr = asio::buffer_cast<const char *>(chunked_buf_.data()); std::string_view size_str(data_ptr, size - CRCF.size()); size_t chunk_size; auto [ptr, err] = std::from_chars( size_str.data(), size_str.data() + size_str.size(), chunk_size, 16); if (err != std::errc{}) { CINATRA_LOG_ERROR << "bad chunked size"; result.ec = std::make_error_code(std::errc::invalid_argument); co_return result; } chunked_buf_.consume(size); if (additional_size < size_t(chunk_size + 2)) { // not a complete chunk, read left chunk data. size_t size_to_read = chunk_size + 2 - additional_size; if (std::tie(ec, size) = co_await async_read(chunked_buf_, size_to_read); ec) { result.ec = ec; close(); co_return result; } } if (chunk_size == 0) { // all finished, no more data chunked_buf_.consume(chunked_buf_.size()); result.eof = true; co_return result; } data_ptr = asio::buffer_cast<const char *>(chunked_buf_.data()); result.data = std::string_view{data_ptr, (size_t)chunk_size}; chunked_buf_.consume(chunk_size + CRCF.size()); co_return result; } async_simple::coro::Lazy<std::error_code> write_websocket( std::string_view msg, opcode op = opcode::text, bool eof = true) { std::vector<asio::const_buffer> buffers; std::string_view header; #ifdef CINATRA_ENABLE_GZIP std::string dest_buf; if (is_client_ws_compressed_ && msg.size() > 0) { if (!cinatra::gzip_codec::deflate(msg, dest_buf)) { CINATRA_LOG_ERROR << "compress data error, data: " << msg; co_return std::make_error_code(std::errc::protocol_error); } header = ws_.encode_ws_header(dest_buf.length(), op, eof, true, false); buffers.push_back(asio::buffer(header)); buffers.push_back(asio::buffer(dest_buf)); } else { #endif header = ws_.encode_ws_header(msg.length(), op, eof, false, false); buffers.push_back(asio::buffer(header)); buffers.push_back(asio::buffer(msg)); #ifdef CINATRA_ENABLE_GZIP } #endif auto [ec, sz] = co_await async_write(buffers); co_return ec; } async_simple::coro::Lazy<websocket_result> read_websocket() { auto [ec, ws_hd_size] = co_await async_read(head_buf_, SHORT_HEADER); websocket_result result{ec}; if (ec) { close(); co_return result; } while (true) { const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data()); auto status = ws_.parse_header(data_ptr, ws_.len_bytes()); if (status == ws_header_status::complete) { ws_.reset_len_bytes(); head_buf_.consume(head_buf_.size()); std::span<char> payload{}; auto payload_length = ws_.payload_length(); if (max_part_size_ != 0 && payload_length > max_part_size_) { std::string close_reason = "message_too_big"; std::string close_msg = ws_.format_close_payload( close_code::too_big, close_reason.data(), close_reason.size()); co_await write_websocket(close_msg, opcode::close); close(); result.ec = std::error_code(asio::error::message_size, asio::error::get_system_category()); break; } if (payload_length > 0) { detail::resize(body_, payload_length); auto [ec, read_sz] = co_await async_read(asio::buffer(body_), payload_length); if (ec) { close(); result.ec = ec; break; } payload = body_; } ws_frame_type type = ws_.parse_payload(payload); switch (type) { case cinatra::ws_frame_type::WS_ERROR_FRAME: close(); result.ec = std::make_error_code(std::errc::protocol_error); break; case cinatra::ws_frame_type::WS_OPENING_FRAME: continue; case ws_frame_type::WS_INCOMPLETE_TEXT_FRAME: case ws_frame_type::WS_INCOMPLETE_BINARY_FRAME: result.eof = false; result.data = {payload.data(), payload.size()}; break; case cinatra::ws_frame_type::WS_TEXT_FRAME: case cinatra::ws_frame_type::WS_BINARY_FRAME: { #ifdef CINATRA_ENABLE_GZIP if (!gzip_compress(payload, result)) { break; } #endif result.eof = true; result.data = {payload.data(), payload.size()}; } break; case cinatra::ws_frame_type::WS_CLOSE_FRAME: { #ifdef CINATRA_ENABLE_GZIP if (!gzip_compress(payload, result)) { break; } #endif close_frame close_frame = ws_.parse_close_payload(payload.data(), payload.size()); result.eof = true; result.data = {close_frame.message, close_frame.length}; std::string close_msg = ws_.format_close_payload( close_code::normal, close_frame.message, close_frame.length); co_await write_websocket(close_msg, opcode::close); close(); } break; case cinatra::ws_frame_type::WS_PING_FRAME: { result.data = {payload.data(), payload.size()}; auto ec = co_await write_websocket(result.data, opcode::pong); if (ec) { close(); result.ec = ec; } } break; case cinatra::ws_frame_type::WS_PONG_FRAME: { result.data = {payload.data(), payload.size()}; auto ec = co_await write_websocket(result.data, opcode::ping); result.ec = ec; } break; default: break; } result.type = type; co_return result; } else if (status == ws_header_status::incomplete) { auto [ec, sz] = co_await async_read(head_buf_, ws_.left_header_len()); if (ec) { close(); result.ec = ec; break; } continue; } else { close(); result.ec = std::make_error_code(std::errc::protocol_error); co_return result; } } co_return result; } #ifdef CINATRA_ENABLE_GZIP bool gzip_compress(std::span<char> &payload, websocket_result &result) { if (is_client_ws_compressed_) { inflate_str_.clear(); if (!cinatra::gzip_codec::inflate({payload.data(), payload.size()}, inflate_str_)) { CINATRA_LOG_ERROR << "compress data error"; result.ec = std::make_error_code(std::errc::protocol_error); return false; } payload = inflate_str_; } return true; } #endif auto &tcp_socket() { return *socket_wrapper_.socket(); } void set_quit_callback(std::function<void(const uint64_t &conn_id)> callback, uint64_t conn_id) { quit_cb_ = std::move(callback); conn_id_ = conn_id; } void set_ws_max_size(uint64_t max_size) { max_part_size_ = max_size; } void set_shrink_to_fit(bool r) { need_shrink_every_time_ = r; response_.set_shrink_to_fit(r); } #ifdef INJECT_FOR_HTTP_SEVER_TEST async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_write_failed() { co_return std::make_pair(std::make_error_code(std::errc::io_error), 0); } async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read_failed() { co_return std::make_pair(std::make_error_code(std::errc::io_error), 0); } #endif template <typename AsioBuffer> async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read( AsioBuffer &&buffer, size_t size_to_read) noexcept { #ifdef INJECT_FOR_HTTP_SEVER_TEST if (read_failed_forever_) { return async_read_failed(); } #endif set_last_time(); #ifdef CINATRA_ENABLE_SSL if (socket_wrapper_.use_ssl()) { return coro_io::async_read(*socket_wrapper_.ssl_stream(), buffer, size_to_read); } else { #endif return coro_io::async_read(*socket_wrapper_.socket(), buffer, size_to_read); #ifdef CINATRA_ENABLE_SSL } #endif } template <typename AsioBuffer> async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_write( AsioBuffer &&buffer) { #ifdef INJECT_FOR_HTTP_SEVER_TEST if (write_failed_forever_) { return async_write_failed(); } #endif set_last_time(); #ifdef CINATRA_ENABLE_SSL if (socket_wrapper_.use_ssl()) { return coro_io::async_write(*socket_wrapper_.ssl_stream(), buffer); } else { #endif return coro_io::async_write(*socket_wrapper_.socket(), buffer); #ifdef CINATRA_ENABLE_SSL } #endif } template <typename AsioBuffer> async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read_until( AsioBuffer &buffer, asio::string_view delim) noexcept { set_last_time(); #ifdef CINATRA_ENABLE_SSL if (socket_wrapper_.use_ssl()) { return coro_io::async_read_until(*socket_wrapper_.ssl_stream(), buffer, delim); } else { #endif return coro_io::async_read_until(*socket_wrapper_.socket(), buffer, delim); #ifdef CINATRA_ENABLE_SSL } #endif } void set_last_time() { if (checkout_timeout_) { last_rwtime_ = std::chrono::system_clock::now(); } } std::chrono::system_clock::time_point get_last_rwtime() { return last_rwtime_; } auto get_executor() { return executor_; } void close(bool need_cb = true) { if (has_closed_) { return; } if (socket_wrapper_.socket()) { asio::dispatch(socket_wrapper_.socket()->get_executor(), [this, need_cb, self = shared_from_this()] { std::error_code ec; socket_wrapper_.socket()->shutdown( asio::socket_base::shutdown_both, ec); socket_wrapper_.socket()->close(ec); if (need_cb && quit_cb_) { quit_cb_(conn_id_); } has_closed_ = true; }); } } void set_check_timeout(bool r) { checkout_timeout_ = r; } void handle_session_for_response() { if (request_.has_session()) { auto session = session_manager::get().get_session(request_.get_cached_session_id()); if (session != nullptr && session->get_need_set_to_client()) { response_.add_cookie(session->get_session_cookie()); session->set_need_set_to_client(false); } } } private: bool check_keep_alive() { if (parser_.has_close()) { return false; } return true; } void build_ws_handshake_head() { uint8_t sha1buf[20], key_src[60]; char accept_key[29]; std::memcpy(key_src, request_.get_header_value("sec-websocket-key").data(), 24); std::memcpy(key_src + 24, ws_guid, 36); sha1_context ctx; init(ctx); update(ctx, key_src, sizeof(key_src)); finish(ctx, sha1buf); code_utils::base64_encode(accept_key, sha1buf, sizeof(sha1buf), 0); response_.set_status_and_content(status_type::switching_protocols, ""); response_.add_header("Upgrade", "WebSocket"); response_.add_header("Connection", "Upgrade"); response_.add_header("Sec-WebSocket-Accept", std::string(accept_key, 28)); auto protocal_str = request_.get_header_value("sec-websocket-protocol"); #ifdef CINATRA_ENABLE_GZIP if (is_client_ws_compressed_) { response_.add_header("Sec-WebSocket-Extensions", "permessage-deflate; client_no_context_takeover"); } #endif if (!protocal_str.empty()) { response_.add_header("Sec-WebSocket-Protocol", std::string(protocal_str)); } } void set_address_impl(std::string &address, bool remote = true) { if (has_closed_) { return; } std::error_code ec; auto pt = remote ? socket_wrapper_.socket()->remote_endpoint(ec) : socket_wrapper_.socket()->local_endpoint(ec); if (ec) { return; } address = pt.address().to_string(ec); if (ec) { return; } address.append(":").append(std::to_string(pt.port())); } private: friend class multipart_reader_t<coro_http_connection>; coro_io::ExecutorWrapper<> *executor_; coro_io::socket_wrapper_t socket_wrapper_; coro_http_router &router_; asio::streambuf head_buf_; std::string body_; asio::streambuf chunked_buf_; http_parser parser_; bool keep_alive_; coro_http_request request_; coro_http_response response_; std::vector<asio::const_buffer> buffers_; std::atomic<bool> has_closed_{false}; uint64_t conn_id_{0}; std::function<void(const uint64_t &conn_id)> quit_cb_ = nullptr; bool checkout_timeout_ = false; std::atomic<std::chrono::system_clock::time_point> last_rwtime_ = std::chrono::system_clock::now(); uint64_t max_part_size_ = 8 * 1024 * 1024; std::string resp_str_; #ifdef CINATRA_ENABLE_GZIP bool is_client_ws_compressed_ = false; std::string inflate_str_; #endif websocket ws_; #ifdef CINATRA_ENABLE_SSL std::unique_ptr<asio::ssl::context> ssl_ctx_ = nullptr; #endif bool need_shrink_every_time_ = false; bool multi_buf_ = true; std::function<async_simple::coro::Lazy<void>(coro_http_request &, coro_http_response &)> default_handler_ = nullptr; std::string chunk_size_str_; std::string remote_addr_; int64_t max_http_body_len_ = 0; #ifdef INJECT_FOR_HTTP_SEVER_TEST bool write_failed_forever_ = false; bool read_failed_forever_ = false; #endif }; } // namespace cinatra