include/ylt/standalone/cinatra/coro_http_client.hpp (2,204 lines of code) (raw):
#pragma once
#include <atomic>
#include <cassert>
#include <charconv>
#include <cstddef>
#include <filesystem>
#include <fstream>
#include <future>
#include <memory>
#include <optional>
#include <span>
#include <string_view>
#include <system_error>
#include <thread>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
#include "asio/dispatch.hpp"
#include "asio/error.hpp"
#include "asio/ip/tcp.hpp"
#include "asio/streambuf.hpp"
#include "async_simple/Future.h"
#include "async_simple/Unit.h"
#include "async_simple/coro/FutureAwaiter.h"
#include "async_simple/coro/Lazy.h"
#ifdef CINATRA_ENABLE_GZIP
#include "gzip.hpp"
#endif
#ifdef CINATRA_ENABLE_BROTLI
#include "brzip.hpp"
#endif
#include "cinatra_log_wrapper.hpp"
#include "http_parser.hpp"
#include "multipart.hpp"
#include "picohttpparser.h"
#include "response_cv.hpp"
#include "string_resize.hpp"
#include "uri.hpp"
#include "websocket.hpp"
#include "ylt/coro_io/coro_file.hpp"
#include "ylt/coro_io/coro_io.hpp"
#include "ylt/coro_io/io_context_pool.hpp"
namespace coro_io {
template <typename T, typename U>
class client_pool;
}
namespace cinatra {
template <class, class = void>
struct is_stream : std::false_type {};
template <class T>
struct is_stream<
T, std::void_t<decltype(std::declval<T>().read(nullptr, 0),
std::declval<T>().async_read(nullptr, 0))>>
: std::true_type {};
template <class T>
constexpr bool is_stream_v = is_stream<T>::value;
template <class, class = void>
struct is_span : std::false_type {};
template <class T>
struct is_span<T, std::void_t<decltype(std::declval<T>().data(),
std::declval<T>().size())>>
: std::true_type {};
template <class T>
constexpr bool is_span_v = is_span<T>::value;
template <class, class = void>
struct is_smart_ptr : std::false_type {};
template <class T>
struct is_smart_ptr<
T, std::void_t<decltype(std::declval<T>().get(), *std::declval<T>(),
is_stream_v<typename T::element_type>)>>
: std::true_type {};
template <class T>
constexpr bool is_stream_ptr_v = is_smart_ptr<T>::value || std::is_pointer_v<T>;
struct http_header;
struct resp_data {
std::error_code net_err;
int status = 0;
bool eof = false;
std::string_view resp_body;
std::span<http_header> resp_headers;
#ifdef BENCHMARK_TEST
uint64_t total = 0;
#endif
};
template <typename String = std::string>
struct req_context {
req_content_type content_type = req_content_type::none;
std::string req_header; /*header string*/
String content; /*body*/
coro_io::coro_file *resp_body_stream = nullptr;
};
struct multipart_t {
std::string filename;
std::string content;
size_t size = 0;
};
struct read_result {
std::span<char> buf;
bool eof;
std::error_code err;
};
enum class upload_type_t { with_length, chunked, multipart };
class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
public:
struct config {
std::optional<std::chrono::steady_clock::duration> conn_timeout_duration;
std::optional<std::chrono::steady_clock::duration> req_timeout_duration;
std::string sec_key;
size_t max_single_part_size;
std::string proxy_host;
std::string proxy_port;
std::string proxy_auth_username;
std::string proxy_auth_passwd;
std::string proxy_auth_token;
bool enable_tcp_no_delay;
#ifdef CINATRA_ENABLE_SSL
bool use_ssl =
false; // if set use_ssl true, cinatra will add https automaticlly.
#endif
};
coro_http_client(asio::io_context::executor_type executor)
: executor_wrapper_(executor),
timer_(&executor_wrapper_),
socket_(std::make_shared<socket_t>(executor)),
head_buf_(socket_->head_buf_),
chunked_buf_(socket_->chunked_buf_) {}
coro_http_client(
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor())
: coro_http_client(executor->get_asio_executor()) {}
bool init_config(const config &conf) {
config_ = conf;
if (conf.conn_timeout_duration.has_value()) {
set_conn_timeout(*conf.conn_timeout_duration);
}
if (conf.req_timeout_duration.has_value()) {
set_req_timeout(*conf.req_timeout_duration);
}
if (!conf.sec_key.empty()) {
set_ws_sec_key(conf.sec_key);
}
if (conf.max_single_part_size > 0) {
set_max_single_part_size(conf.max_single_part_size);
}
if (!conf.proxy_host.empty()) {
set_proxy_basic_auth(conf.proxy_host, conf.proxy_port);
}
if (!conf.proxy_auth_username.empty()) {
set_proxy_basic_auth(conf.proxy_auth_username, conf.proxy_auth_passwd);
}
if (!conf.proxy_auth_token.empty()) {
set_proxy_bearer_token_auth(conf.proxy_auth_token);
}
if (conf.enable_tcp_no_delay) {
enable_tcp_no_delay_ = conf.enable_tcp_no_delay;
}
#ifdef CINATRA_ENABLE_SSL
set_ssl_schema(conf.use_ssl);
#endif
return true;
}
~coro_http_client() { close(); }
void close() {
if (socket_ == nullptr || socket_->has_closed_)
return;
asio::dispatch(executor_wrapper_.get_asio_executor(), [socket = socket_] {
close_socket(*socket);
});
}
coro_io::ExecutorWrapper<> &get_executor() { return executor_wrapper_; }
const config &get_config() { return config_; }
#ifdef CINATRA_ENABLE_SSL
bool init_ssl(int verify_mode, const std::string &base_path,
const std::string &cert_file, const std::string &sni_hostname) {
if (has_init_ssl_) {
return true;
}
try {
ssl_ctx_ =
std::make_unique<asio::ssl::context>(asio::ssl::context::sslv23);
auto full_cert_file = std::filesystem::path(base_path).append(cert_file);
if (std::filesystem::exists(full_cert_file)) {
ssl_ctx_->load_verify_file(full_cert_file.string());
}
else {
if (!base_path.empty() || !cert_file.empty())
return false;
}
if (base_path.empty() && cert_file.empty()) {
ssl_ctx_->set_default_verify_paths();
}
ssl_ctx_->set_verify_mode(verify_mode);
socket_->ssl_stream_ =
std::make_unique<asio::ssl::stream<asio::ip::tcp::socket &>>(
socket_->impl_, *ssl_ctx_);
// ssl_ctx_.add_certificate_authority(asio::buffer(CA_PEM));
if (!sni_hostname.empty()) {
ssl_ctx_->set_verify_callback(
asio::ssl::host_name_verification(sni_hostname));
if (need_set_sni_host_) {
// Set SNI Hostname (many hosts need this to handshake successfully)
SSL_set_tlsext_host_name(socket_->ssl_stream_->native_handle(),
sni_hostname.c_str());
}
}
has_init_ssl_ = true;
} catch (std::exception &e) {
CINATRA_LOG_ERROR << "init ssl failed: " << e.what();
return false;
}
return true;
}
[[nodiscard]] bool init_ssl(int verify_mode = asio::ssl::verify_none,
std::string full_path = "",
const std::string &sni_hostname = "") {
std::string base_path;
std::string cert_file;
if (full_path.empty()) {
base_path = "";
cert_file = "";
}
else {
base_path = full_path.substr(0, full_path.find_last_of('/'));
cert_file = full_path.substr(full_path.find_last_of('/') + 1);
}
return init_ssl(verify_mode, base_path, cert_file, sni_hostname);
}
#endif
// return body_, the user will own body's lifetime.
std::string release_buf() {
if (body_.empty()) {
return std::move(resp_chunk_str_);
}
return std::move(body_);
}
#ifdef CINATRA_ENABLE_GZIP
void set_ws_deflate(bool enable_ws_deflate) {
enable_ws_deflate_ = enable_ws_deflate;
}
#endif
/*!
* Connect server
*
* only make socket connet(or handshake) to the host
*
* @param uri server address
* @param eps endpoints of resolve result. if eps is not nullptr and vector is
* empty, it will return the endpoints that, else if vector is not empty, it
* will use the eps to skill resolve and connect to server directly.
* @return resp_data
*/
async_simple::coro::Lazy<resp_data> connect(
std::string uri, std::vector<asio::ip::tcp::endpoint> *eps = nullptr) {
if (should_reset_) {
reset();
}
else {
should_reset_ = true;
}
resp_data data{};
bool no_schema = !has_schema(uri);
std::string append_uri;
if (no_schema) {
#ifdef CINATRA_ENABLE_SSL
if (is_ssl_schema_)
append_uri.append("https://").append(uri);
else
#endif
append_uri.append("http://").append(uri);
}
auto [ok, u] = handle_uri(data, no_schema ? append_uri : uri);
if (!ok) {
co_return resp_data{std::make_error_code(std::errc::protocol_error), 404};
}
{
auto time_out_guard =
timer_guard(this, conn_timeout_duration_, "connect timer");
if (u.is_websocket()) {
// build websocket http header
add_header("Upgrade", "websocket");
add_header("Connection", "Upgrade");
if (ws_sec_key_.empty()) {
ws_sec_key_ = "s//GYHa/XO7Hd2F2eOGfyA=="; // provide a random string.
}
add_header("Sec-WebSocket-Key", ws_sec_key_);
add_header("Sec-WebSocket-Version", "13");
#ifdef CINATRA_ENABLE_GZIP
if (enable_ws_deflate_)
add_header("Sec-WebSocket-Extensions",
"permessage-deflate; client_max_window_bits");
#endif
req_context<> ctx{};
data = co_await async_request(std::move(uri), http_method::GET,
std::move(ctx));
#ifdef CINATRA_ENABLE_GZIP
if (enable_ws_deflate_) {
for (auto c : data.resp_headers) {
if (c.name == "Sec-WebSocket-Extensions") {
if (c.value.find("permessage-deflate;") != std::string::npos) {
is_server_support_ws_deflate_ = true;
}
else {
is_server_support_ws_deflate_ = false;
}
break;
}
}
}
#endif
co_return data;
}
data = co_await connect(u, eps);
}
if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
}
if (!data.net_err) {
data.status = 200;
}
co_return data;
}
bool has_closed() { return socket_->has_closed_; }
const auto &get_headers() { return req_headers_; }
void set_headers(std::unordered_map<std::string, std::string> req_headers) {
req_headers_ = std::move(req_headers);
}
bool add_header(std::string key, std::string val) {
if (key.empty())
return false;
req_headers_[key] = std::move(val);
return true;
}
void set_ws_sec_key(std::string sec_key) { ws_sec_key_ = std::move(sec_key); }
void set_max_http_body_size(int64_t max_size) {
max_http_body_len_ = max_size;
}
size_t available() {
std::error_code ec{};
return socket_->impl_.available(ec);
}
async_simple::coro::Lazy<resp_data> read_websocket() {
auto time_out_guard =
timer_guard(this, req_timeout_duration_, "websocket timer");
co_return co_await async_read_ws();
}
async_simple::coro::Lazy<resp_data> write_websocket(
const char *data, opcode op = opcode::text) {
std::string str(data);
co_return co_await write_websocket(str, op);
}
async_simple::coro::Lazy<resp_data> write_websocket(
const char *data, size_t size, opcode op = opcode::text) {
std::string str(data, size);
co_return co_await write_websocket(str, op);
}
async_simple::coro::Lazy<resp_data> write_websocket(
std::string_view data, opcode op = opcode::text) {
std::string str(data);
co_return co_await write_websocket(str, op);
}
async_simple::coro::Lazy<resp_data> write_websocket(
std::string &data, opcode op = opcode::text) {
co_return co_await write_websocket(std::span<char>(data), op);
}
async_simple::coro::Lazy<resp_data> write_websocket(
std::string &&data, opcode op = opcode::text) {
co_return co_await write_websocket(std::span<char>(data), op);
}
async_simple::coro::Lazy<void> write_ws_frame(std::span<char> msg,
websocket ws, opcode op,
resp_data &data,
bool eof = true) {
auto header = ws.encode_frame(msg, op, eof, enable_ws_deflate_);
std::vector<asio::const_buffer> buffers{
asio::buffer(header), asio::buffer(msg.data(), msg.size())};
auto [ec, sz] = co_await async_write(buffers);
if (ec) {
data.net_err = ec;
data.status = 404;
}
}
#ifdef CINATRA_ENABLE_GZIP
void gzip_compress(std::string_view source, std::string &dest_buf,
std::span<char> &span, resp_data &data) {
if (enable_ws_deflate_ && is_server_support_ws_deflate_) {
if (cinatra::gzip_codec::deflate(source, dest_buf)) {
span = dest_buf;
}
else {
CINATRA_LOG_ERROR << "compress data error, data: " << source;
data.net_err = std::make_error_code(std::errc::protocol_error);
data.status = 404;
}
}
}
#endif
template <typename Source>
async_simple::coro::Lazy<resp_data> write_websocket(
Source source, opcode op = opcode::text) {
resp_data data{};
websocket ws{};
std::string close_str;
if (op == opcode::close) {
if constexpr (is_span_v<Source>) {
close_str = ws.format_close_payload(close_code::normal, source.data(),
source.size());
source = {close_str.data(), close_str.size()};
}
}
std::span<char> span{};
if constexpr (is_span_v<Source>) {
span = {source.data(), source.size()};
#ifdef CINATRA_ENABLE_GZIP
std::string dest_buf;
if (enable_ws_deflate_) {
gzip_compress({source.data(), source.size()}, dest_buf, span, data);
}
#endif
co_await write_ws_frame(span, ws, op, data, true);
}
else {
while (true) {
auto result = co_await source();
span = {result.buf.data(), result.buf.size()};
#ifdef CINATRA_ENABLE_GZIP
std::string dest_buf;
if (enable_ws_deflate_) {
gzip_compress({result.buf.data(), result.buf.size()}, dest_buf, span,
data);
}
#endif
co_await write_ws_frame(span, ws, op, data, result.eof);
if (result.eof || data.status == 404) {
break;
}
}
}
co_return data;
}
async_simple::coro::Lazy<resp_data> write_websocket_close(
std::string msg = "") {
co_return co_await write_websocket(std::move(msg), opcode::close);
}
#ifdef BENCHMARK_TEST
void set_bench_stop() { stop_bench_ = true; }
#endif
async_simple::coro::Lazy<resp_data> async_patch(
std::string uri,
std::unordered_map<std::string, std::string> headers = {}) {
return async_request(std::move(uri), cinatra::http_method::PATCH,
cinatra::req_context<>{}, std::move(headers));
}
async_simple::coro::Lazy<resp_data> async_options(
std::string uri,
std::unordered_map<std::string, std::string> headers = {}) {
return async_request(std::move(uri), cinatra::http_method::OPTIONS,
cinatra::req_context<>{}, std::move(headers));
}
async_simple::coro::Lazy<resp_data> async_trace(
std::string uri,
std::unordered_map<std::string, std::string> headers = {}) {
return async_request(std::move(uri), cinatra::http_method::TRACE,
cinatra::req_context<>{}, std::move(headers));
}
async_simple::coro::Lazy<resp_data> async_head(
std::string uri,
std::unordered_map<std::string, std::string> headers = {}) {
return async_request(std::move(uri), cinatra::http_method::HEAD,
cinatra::req_context<>{}, std::move(headers));
}
// CONNECT example.com HTTP/1.1
async_simple::coro::Lazy<resp_data> async_http_connect(
std::string uri,
std::unordered_map<std::string, std::string> headers = {}) {
return async_request(std::move(uri), cinatra::http_method::CONNECT,
cinatra::req_context<>{}, std::move(headers));
}
async_simple::coro::Lazy<resp_data> async_get(
std::string uri,
std::unordered_map<std::string, std::string> headers = {}) {
resp_data data{};
req_context<> ctx{};
data = co_await async_request(std::move(uri), http_method::GET,
std::move(ctx), std::move(headers));
#ifdef BENCHMARK_TEST
data.total = total_len_;
#endif
if (redirect_uri_.empty() || !is_redirect(data)) {
co_return data;
}
else {
if (enable_follow_redirect_)
data = co_await async_request(std::move(redirect_uri_),
http_method::GET, std::move(ctx));
co_return data;
}
}
resp_data get(std::string uri,
std::unordered_map<std::string, std::string> headers = {}) {
return async_simple::coro::syncAwait(
async_get(std::move(uri), std::move(headers)));
}
resp_data post(std::string uri, std::string content,
req_content_type content_type,
std::unordered_map<std::string, std::string> headers = {}) {
return async_simple::coro::syncAwait(async_post(
std::move(uri), std::move(content), content_type, std::move(headers)));
}
async_simple::coro::Lazy<resp_data> async_post(
std::string uri, std::string content, req_content_type content_type,
std::unordered_map<std::string, std::string> headers = {}) {
req_context<> ctx{content_type, "", std::move(content)};
return async_request(std::move(uri), http_method::POST, std::move(ctx),
std::move(headers));
}
async_simple::coro::Lazy<resp_data> async_delete(
std::string uri, std::string content, req_content_type content_type,
std::unordered_map<std::string, std::string> headers = {}) {
req_context<> ctx{content_type, "", std::move(content)};
return async_request(std::move(uri), http_method::DEL, std::move(ctx),
std::move(headers));
}
async_simple::coro::Lazy<resp_data> async_put(
std::string uri, std::string content, req_content_type content_type,
std::unordered_map<std::string, std::string> headers = {}) {
req_context<> ctx{content_type, "", std::move(content)};
return async_request(std::move(uri), http_method::PUT, std::move(ctx),
std::move(headers));
}
bool add_str_part(std::string name, std::string content) {
size_t size = content.size();
return form_data_
.emplace(std::move(name), multipart_t{"", std::move(content), size})
.second;
}
bool add_file_part(std::string name, std::string filename) {
if (form_data_.find(name) != form_data_.end()) {
CINATRA_LOG_WARNING << "name already exist: " << name;
return false;
}
std::error_code ec;
bool r = std::filesystem::exists(filename, ec);
if (!r || ec) {
if (ec) {
CINATRA_LOG_WARNING << ec.message();
}
CINATRA_LOG_WARNING << "file not exists, "
<< std::filesystem::current_path().string();
return false;
}
size_t file_size = std::filesystem::file_size(filename);
form_data_.emplace(std::move(name),
multipart_t{std::move(filename), "", file_size});
return true;
}
void set_max_single_part_size(size_t size) { max_single_part_size_ = size; }
struct timer_guard {
timer_guard(coro_http_client *self,
std::chrono::steady_clock::duration duration, std::string msg)
: self(self), dur_(duration) {
self->socket_->is_timeout_ = false;
if (duration.count() >= 0) {
self->timeout(self->timer_, duration, std::move(msg))
.start([](auto &&) {
});
}
return;
}
~timer_guard() {
if (dur_.count() > 0 && self->socket_->is_timeout_ == false) {
std::error_code ignore_ec;
self->timer_.cancel(ignore_ec);
}
}
coro_http_client *self;
std::chrono::steady_clock::duration dur_;
};
async_simple::coro::Lazy<resp_data> async_download(std::string uri,
std::string filename,
std::string range = "") {
resp_data data{};
coro_io::coro_file file;
file.open(filename, std::ios::trunc | std::ios::out);
if (!file.is_open()) {
data.net_err = std::make_error_code(std::errc::no_such_file_or_directory);
data.status = 404;
co_return data;
}
req_context<> ctx{};
if (range.empty()) {
add_header("Transfer-Encoding", "chunked");
ctx = {req_content_type::none, "", "", &file};
}
else {
std::string req_str = "Range: bytes=";
req_str.append(range).append(CRCF);
ctx = {req_content_type::none, std::move(req_str), {}, &file};
}
data = co_await async_request(std::move(uri), http_method::GET,
std::move(ctx));
co_return data;
}
resp_data download(std::string uri, std::string filename,
std::string range = "") {
return async_simple::coro::syncAwait(
async_download(std::move(uri), std::move(filename), std::move(range)));
}
bool is_body_in_out_buf() const { return !out_buf_.empty(); }
void reset() {
if (!has_closed()) {
close_socket(*socket_);
}
socket_->impl_ = asio::ip::tcp::socket{executor_wrapper_.context()};
if (!socket_->impl_.is_open()) {
std::error_code ec;
socket_->impl_.open(asio::ip::tcp::v4(), ec);
if (ec) {
CINATRA_LOG_WARNING << "client reset socket failed, reason: "
<< ec.message();
return;
}
}
socket_->has_closed_ = true;
#ifdef CINATRA_ENABLE_SSL
need_set_sni_host_ = true;
if (has_init_ssl_) {
socket_->ssl_stream_ = nullptr;
socket_->ssl_stream_ =
std::make_unique<asio::ssl::stream<asio::ip::tcp::socket &>>(
socket_->impl_, *ssl_ctx_);
has_init_ssl_ = false;
}
#endif
#ifdef BENCHMARK_TEST
total_len_ = 0;
#endif
// clear
head_buf_.consume(head_buf_.size());
chunked_buf_.consume(chunked_buf_.size());
resp_chunk_str_.clear();
}
std::string_view get_host() { return host_; }
std::string_view get_port() { return port_; }
private:
async_simple::coro::Lazy<void> send_file_copy_with_chunked(
std::string_view source, std::error_code &ec) {
std::string file_data;
detail::resize(file_data, max_single_part_size_);
coro_io::coro_file file{};
file.open(source, std::ios::in);
if (!file.is_open()) {
ec = std::make_error_code(std::errc::bad_file_descriptor);
co_return;
}
while (!file.eof()) {
auto [rd_ec, rd_size] =
co_await file.async_read(file_data.data(), file_data.size());
std::vector<asio::const_buffer> bufs;
std::string size_str;
cinatra::to_chunked_buffers(bufs, size_str, {file_data.data(), rd_size},
file.eof());
std::size_t size;
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
}
}
}
async_simple::coro::Lazy<void> send_file_copy_with_length(
std::string_view source, std::error_code &ec, std::size_t length,
std::size_t offset) {
if (length <= 0) {
co_return;
}
std::string file_data;
detail::resize(file_data, (std::min)(max_single_part_size_, length));
coro_io::coro_file file{};
file.open(source, std::ios::in);
if (!file.is_open()) {
ec = std::make_error_code(std::errc::bad_file_descriptor);
co_return;
}
file.seek(offset, std::ios::cur);
std::size_t size;
while (length > 0) {
if (std::tie(ec, size) = co_await file.async_read(
file_data.data(), (std::min)(file_data.size(), length));
ec) {
// bad request, file may smaller than content-length
break;
}
length -= size;
if (length > 0 && file.eof()) {
// bad request, file may smaller than content-length
ec = std::make_error_code(std::errc::invalid_argument);
break;
}
if (std::tie(ec, size) =
co_await async_write(asio::buffer(file_data.data(), size));
ec) {
break;
}
}
}
#ifdef __linux__
struct fd_guard {
int fd;
fd_guard(const char *file_path) : fd(::open(file_path, O_RDONLY)) {}
~fd_guard() {
if (fd >= 0) {
::close(fd);
}
}
};
async_simple::coro::Lazy<void> send_file_no_copy_with_length(
const std::filesystem::path &source, std::error_code &ec,
std::size_t length, std::size_t offset) {
fd_guard guard(source.c_str());
if (guard.fd < 0) [[unlikely]] {
ec = std::make_error_code(std::errc::bad_file_descriptor);
co_return;
}
std::size_t actual_len = 0;
std::tie(ec, actual_len) = co_await coro_io::async_sendfile(
socket_->impl_, guard.fd, offset, length);
if (ec) [[unlikely]] {
co_return;
}
if (actual_len != length) [[unlikely]] {
// bad request, file is smaller than content-length
ec = std::make_error_code(std::errc::invalid_argument);
co_return;
}
}
async_simple::coro::Lazy<void> send_file_no_copy_with_chunked(
const std::filesystem::path &source, std::error_code &ec) {
fd_guard guard(source.c_str());
if (guard.fd < 0) [[unlikely]] {
ec = std::make_error_code(std::errc::bad_file_descriptor);
co_return;
}
off_t now_position = 0,
max_position = std::filesystem::file_size(source, ec);
if (ec) {
co_return;
}
size_t len =
std::min<size_t>(max_single_part_size_, max_position - now_position);
// send chunked
std::array<char, 24> chunked_buffer;
std::size_t sz;
std::tie(ec, sz) = co_await async_write(
asio::buffer(get_chuncked_buffers<true, false>(len, chunked_buffer)));
if (ec) [[unlikely]] {
co_return;
}
do {
std::size_t actual_len = 0;
std::tie(ec, actual_len) = co_await coro_io::async_sendfile(
socket_->impl_, guard.fd, now_position, len);
if (ec) [[unlikely]] {
co_return;
}
if (actual_len != len) [[unlikely]] {
// bad request, file is smaller than content-length
ec = std::make_error_code(std::errc::invalid_argument);
co_return;
}
if (now_position += actual_len; now_position < max_position) {
len = std::min<size_t>(max_single_part_size_,
max_position - now_position);
std::tie(ec, sz) = co_await async_write(asio::buffer(
get_chuncked_buffers<false, false>(len, chunked_buffer)));
if (ec) {
co_return;
}
}
else [[unlikely]] {
std::tie(ec, sz) = co_await async_write(asio::buffer(
get_chuncked_buffers<false, true>(len, chunked_buffer)));
if (ec) {
co_return;
}
break;
}
} while (true);
}
#endif
template <typename stream>
static std::size_t getRemainingBytes(stream &file) {
auto current_pos = file.tellg();
file.seekg(0, std::ios::end);
auto end_pos = file.tellg();
auto remaining_bytes = end_pos - current_pos;
file.seekg(current_pos);
return remaining_bytes;
}
template <typename Source>
void check_source(resp_data &data, Source &source) {
if constexpr (is_stream_ptr_v<Source>) {
if (!source) {
data = resp_data{
std::make_error_code(std::errc::no_such_file_or_directory), 404};
}
}
else if constexpr (std::is_same_v<Source, std::string> ||
std::is_same_v<Source, std::string_view>) {
if (!std::filesystem::exists(source)) {
data = resp_data{
std::make_error_code(std::errc::no_such_file_or_directory), 404};
}
}
}
void handle_upload_header_with_multipart() {
size_t content_len = multipart_content_len();
add_header("Content-Length", std::to_string(content_len));
}
void handle_upload_header_with_chunked(
std::unordered_map<std::string, std::string> &headers) {
if (!resp_chunk_str_.empty()) {
resp_chunk_str_.clear();
}
if (headers.empty()) {
add_header("Transfer-Encoding", "chunked");
}
else {
headers.emplace("Transfer-Encoding", "chunked");
}
}
template <typename Source>
int64_t handle_upload_header_with_length(
resp_data &data, Source &source,
std::unordered_map<std::string, std::string> &headers, uint64_t offset,
int64_t content_length) {
if (content_length < 0) {
if constexpr (is_stream_ptr_v<Source>) {
content_length = getRemainingBytes(*source);
}
else if constexpr (std::is_same_v<Source, std::string> ||
std::is_same_v<Source, std::string_view>) {
content_length = std::filesystem::file_size(source);
}
else {
CINATRA_LOG_ERROR
<< "user should set content-length before calling async_upload "
"when source is user-defined function.";
data =
resp_data{std::make_error_code(std::errc::invalid_argument), 404};
return content_length;
}
content_length -= offset;
if (content_length < 0) {
CINATRA_LOG_ERROR << "the offset is larger than the end of file";
data =
resp_data{std::make_error_code(std::errc::invalid_argument), 404};
return content_length;
}
}
assert(content_length >= 0);
char buf[32];
auto [ptr, _] = std::to_chars(buf, buf + 32, content_length);
if (headers.empty()) {
add_header("Content-Length", std::string(buf, ptr - buf));
}
else {
headers.emplace("Content-Length", std::string_view(buf, ptr - buf));
}
return content_length;
}
async_simple::coro::Lazy<void> send_fstream_with_multipart(
std::error_code &ec) {
resp_data data{};
for (auto &[key, part] : form_data_) {
data = co_await send_single_part(key, part);
if (data.net_err) {
ec = data.net_err;
co_return;
}
}
std::string last_part;
size_t size = 0;
last_part.append("--").append(BOUNDARY).append("--").append(CRCF);
if (std::tie(ec, size) = co_await async_write(asio::buffer(last_part));
ec) {
co_return;
}
}
template <typename Source>
async_simple::coro::Lazy<void> send_fstream_with_chunked(
Source &source, std::error_code &ec) {
size_t size = 0;
std::string file_data;
detail::resize(file_data, max_single_part_size_);
while (!source->eof()) {
size_t rd_size =
source->read(file_data.data(), file_data.size()).gcount();
std::vector<asio::const_buffer> bufs;
std::string size_str;
cinatra::to_chunked_buffers(bufs, size_str, {file_data.data(), rd_size},
source->eof());
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
}
}
}
template <typename Source>
async_simple::coro::Lazy<void> send_fstream_with_length(
Source &source, std::error_code &ec, uint64_t offset,
int64_t content_length) {
size_t size = 0;
source->seekg(offset, std::ios::cur);
std::string file_data;
detail::resize(file_data, std::min<std::size_t>(max_single_part_size_,
content_length));
while (content_length > 0 && !source->eof()) {
size_t rd_size =
source
->read(file_data.data(),
std::min<size_t>(content_length, file_data.size()))
.gcount();
if (std::tie(ec, size) =
co_await async_write(asio::buffer(file_data.data(), rd_size));
ec) {
break;
}
content_length -= rd_size;
}
if (!ec && content_length > 0) {
// bad request, file is smaller than content-length
ec = std::make_error_code(std::errc::invalid_argument);
}
}
template <typename Source>
async_simple::coro::Lazy<void> send_sink_with_chunked(Source &source,
std::error_code &ec) {
size_t size = 0;
while (true) {
auto result = co_await source();
std::vector<asio::const_buffer> bufs;
std::string size_str;
cinatra::to_chunked_buffers(
bufs, size_str, {result.buf.data(), result.buf.size()}, result.eof);
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
}
if (result.eof) {
break;
}
}
}
template <typename Source>
async_simple::coro::Lazy<void> send_sink_with_length(Source &source,
std::error_code &ec,
int64_t content_length) {
size_t size = 0;
while (true) {
auto result = co_await source();
if (std::tie(ec, size) = co_await async_write(asio::buffer(
result.buf.data(),
std::min<std::size_t>(content_length, result.buf.size())));
ec) {
break;
}
content_length -= size;
if (content_length <= 0) {
break;
}
else if (result.eof) [[unlikely]] {
// bad request, file is smaller than content-length
ec = std::make_error_code(std::errc::invalid_argument);
break;
}
}
}
async_simple::coro::Lazy<bool> reconnect(resp_data &data, uri_t u) {
{
auto guard = timer_guard(this, conn_timeout_duration_, "connect timer");
data = co_await connect(u);
}
if (socket_->is_timeout_) {
data = resp_data{std::make_error_code(std::errc::timed_out), 404};
}
if (data.net_err) {
co_return false;
}
co_return true;
}
void handle_upload_timeout_error(std::error_code &ec) {
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
if (write_header_timeout_ || write_payload_timeout_ || read_timeout_) {
socket_->is_timeout_ = true;
}
#endif
if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}
}
template <upload_type_t upload_type, typename S, typename Source>
async_simple::coro::Lazy<resp_data> async_upload_impl(
S uri, http_method method, Source source /* file */,
req_content_type content_type = req_content_type::text,
std::unordered_map<std::string, std::string> headers = {},
uint64_t offset = 0 /*file offset*/,
int64_t content_length = -1 /*upload size*/) {
std::error_code ec{};
size_t size = 0;
bool is_keep_alive = true;
req_context<> ctx{content_type};
resp_data data{};
out_buf_ = {};
std::shared_ptr<void> guard(nullptr, [&, this](auto) {
if (!req_headers_.empty()) {
req_headers_.clear();
}
});
auto [ok, u] = handle_uri(data, uri);
if (!ok) {
co_return resp_data{std::make_error_code(std::errc::protocol_error), 404};
}
if constexpr (upload_type != upload_type_t::multipart) {
check_source(data, source);
if (data.status != 0) {
co_return data;
}
}
if constexpr (upload_type == upload_type_t::with_length) {
content_length = handle_upload_header_with_length(data, source, headers,
offset, content_length);
if (data.status != 0) {
co_return data;
}
}
else if constexpr (upload_type == upload_type_t::chunked) {
handle_upload_header_with_chunked(headers);
}
else if constexpr (upload_type == upload_type_t::multipart) {
handle_upload_header_with_multipart();
}
std::string header_str =
build_request_header(u, method, ctx, true, std::move(headers));
if (socket_->has_closed_) {
if (bool r = co_await reconnect(data, u); !r) {
co_return data;
}
}
auto time_guard = timer_guard(this, req_timeout_duration_, "request timer");
std::tie(ec, size) = co_await async_write(asio::buffer(header_str));
if (ec) {
handle_upload_timeout_error(ec);
co_return resp_data{ec, 404};
}
constexpr bool is_stream_file = is_stream_ptr_v<Source>;
if constexpr (is_stream_file) {
if constexpr (upload_type == upload_type_t::with_length) {
co_await send_fstream_with_length(source, ec, offset, content_length);
}
else if constexpr (upload_type == upload_type_t::chunked) {
co_await send_fstream_with_chunked(source, ec);
}
}
else if constexpr (std::is_enum_v<Source>) { // only for multipart
co_await send_fstream_with_multipart(ec);
}
else if constexpr (std::is_same_v<Source, std::string> ||
std::is_same_v<Source, std::string_view>) {
#ifdef __linux__
#ifdef CINATRA_ENABLE_SSL
if (!has_init_ssl_) {
#endif
if constexpr (upload_type == upload_type_t::with_length) {
co_await send_file_no_copy_with_length(std::filesystem::path{source},
ec, content_length, offset);
}
else if constexpr (upload_type == upload_type_t::chunked) {
co_await send_file_no_copy_with_chunked(std::filesystem::path{source},
ec);
}
#ifdef CINATRA_ENABLE_SSL
}
else {
if constexpr (upload_type == upload_type_t::with_length) {
co_await send_file_copy_with_length(source, ec, content_length,
offset);
}
else if constexpr (upload_type == upload_type_t::chunked) {
co_await send_file_copy_with_chunked(source, ec);
}
}
#endif
#else
if constexpr (upload_type == upload_type_t::with_length) {
co_await send_file_copy_with_length(source, ec, content_length, offset);
}
else if constexpr (upload_type == upload_type_t::chunked) {
co_await send_file_copy_with_chunked(source, ec);
}
#endif
}
else {
if constexpr (upload_type == upload_type_t::with_length) {
co_await send_sink_with_length(source, ec, content_length);
}
else if constexpr (upload_type == upload_type_t::chunked) {
co_await send_sink_with_chunked(source, ec);
}
}
if (ec) {
handle_upload_timeout_error(ec);
co_return resp_data{ec, 404};
}
data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx),
http_method::POST);
if (ec) {
handle_upload_timeout_error(ec);
}
handle_result(data, ec, is_keep_alive);
co_return data;
}
public:
// send file with length
template <typename S, typename Source>
async_simple::coro::Lazy<resp_data> async_upload(
S uri, http_method method, Source source /* file */,
uint64_t offset = 0 /*file offset*/,
int64_t content_length = -1 /*upload size*/,
req_content_type content_type = req_content_type::text,
std::unordered_map<std::string, std::string> headers = {}) {
return async_upload_impl<upload_type_t::with_length>(
std::move(uri), method, std::move(source), content_type,
std::move(headers), offset, content_length);
}
// send file with chunked
template <typename S, typename Source>
async_simple::coro::Lazy<resp_data> async_upload_chunked(
S uri, http_method method, Source source,
req_content_type content_type = req_content_type::text,
std::unordered_map<std::string, std::string> headers = {}) {
return async_upload_impl<upload_type_t::chunked>(
std::move(uri), method, std::move(source), content_type,
std::move(headers));
}
// send multipart data, should call add_file_part or add_str_part firstly.
async_simple::coro::Lazy<resp_data> async_upload_multipart(std::string uri) {
if (form_data_.empty()) {
CINATRA_LOG_WARNING << "no multipart";
co_return resp_data{std::make_error_code(std::errc::invalid_argument),
404};
}
co_return co_await async_upload_impl<upload_type_t::multipart>(
std::move(uri), http_method::POST, upload_type_t::multipart,
req_content_type::multipart);
}
async_simple::coro::Lazy<resp_data> async_upload_multipart(
std::string uri, std::string name, std::string filename) {
if (!add_file_part(std::move(name), std::move(filename))) {
CINATRA_LOG_WARNING << "open file failed or duplicate test names";
co_return resp_data{std::make_error_code(std::errc::invalid_argument),
404};
}
co_return co_await async_upload_multipart(std::move(uri));
}
template <typename S, typename String>
async_simple::coro::Lazy<resp_data> async_request(
S uri, http_method method, req_context<String> ctx,
std::unordered_map<std::string, std::string> headers = {},
std::span<char> out_buf = {}) {
if (!resp_chunk_str_.empty()) {
resp_chunk_str_.clear();
}
if (!body_.empty()) {
body_.clear();
}
out_buf_ = out_buf;
std::shared_ptr<int> guard(nullptr, [this](auto) {
if (!req_headers_.empty()) {
req_headers_.clear();
}
});
resp_data data{};
std::error_code ec{};
size_t size = 0;
bool is_keep_alive = true;
do {
uri_t u;
std::string append_uri;
if (socket_->has_closed_ || (!uri.empty() && uri[0] != '/')) {
bool no_schema = !has_schema(uri);
if (no_schema) {
#ifdef CINATRA_ENABLE_SSL
if (is_ssl_schema_) {
append_uri.append("https://").append(uri);
}
else
#endif
{
append_uri.append("http://").append(uri);
}
}
bool ok = false;
std::tie(ok, u) = handle_uri(data, no_schema ? append_uri : uri);
if (!ok) {
break;
}
}
else {
u.path = uri;
}
if (socket_->has_closed_) {
data = co_await connect(u);
if (data.status != 0) {
co_return data;
}
}
std::vector<asio::const_buffer> vec;
std::string req_head_str =
build_request_header(u, method, ctx, false, std::move(headers));
bool has_body = !ctx.content.empty();
if (has_body) {
vec.push_back(asio::buffer(req_head_str));
vec.push_back(asio::buffer(ctx.content.data(), ctx.content.size()));
}
#ifdef CORO_HTTP_PRINT_REQ_HEAD
CINATRA_LOG_DEBUG << req_head_str;
#endif
auto guard = timer_guard(this, req_timeout_duration_, "request timer");
if (has_body) {
std::tie(ec, size) = co_await async_write(vec);
}
else {
std::tie(ec, size) = co_await async_write(asio::buffer(req_head_str));
}
if (ec) {
break;
}
data =
co_await handle_read(ec, size, is_keep_alive, std::move(ctx), method);
} while (0);
if (ec && socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}
handle_result(data, ec, is_keep_alive);
co_return data;
}
async_simple::coro::Lazy<std::error_code> handle_shake() {
#ifdef CINATRA_ENABLE_SSL
if (!has_init_ssl_) {
bool r = init_ssl(asio::ssl::verify_none, "", host_);
if (!r) {
co_return std::make_error_code(std::errc::invalid_argument);
}
}
if (socket_->ssl_stream_ == nullptr) {
co_return std::make_error_code(std::errc::not_a_stream);
}
auto ec = co_await coro_io::async_handshake(socket_->ssl_stream_,
asio::ssl::stream_base::client);
if (ec) {
CINATRA_LOG_ERROR << "handle failed " << ec.message();
}
co_return ec;
#else
// please open CINATRA_ENABLE_SSL before request https!
co_return std::make_error_code(std::errc::protocol_error);
#endif
}
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
async_simple::coro::Lazy<std::error_code> async_write_raw(
std::string_view data) {
auto [ec, _] = co_await async_write(asio::buffer(data));
co_return ec;
}
async_simple::coro::Lazy<resp_data> async_read_raw(
http_method method, bool clear_buffer = false) {
if (clear_buffer) {
body_.clear();
}
char buf[1024];
std::error_code ec{};
size_t size{};
#ifdef CINATRA_ENABLE_SSL
if (has_init_ssl_) {
std::tie(ec, size) = co_await coro_io::async_read_some(
*socket_->ssl_stream_, asio::buffer(buf, 1024));
}
else {
#endif
std::tie(ec, size) = co_await coro_io::async_read_some(
socket_->impl_, asio::buffer(buf, 1024));
#ifdef CINATRA_ENABLE_SSL
}
#endif
body_.append(buf, size);
co_return resp_data{ec, {}, {}, body_};
}
#endif
inline void set_proxy(const std::string &host, const std::string &port) {
proxy_host_ = host;
proxy_port_ = port;
}
inline void set_proxy_basic_auth(const std::string &username,
const std::string &password) {
proxy_basic_auth_username_ = username;
proxy_basic_auth_password_ = password;
}
inline void set_proxy_bearer_token_auth(const std::string &token) {
proxy_bearer_token_auth_token_ = token;
}
inline void enable_auto_redirect(bool enable_follow_redirect) {
enable_follow_redirect_ = enable_follow_redirect;
}
#ifdef CINATRA_ENABLE_SSL
void set_ssl_schema(bool r) { is_ssl_schema_ = r; }
#endif
std::string get_redirect_uri() { return redirect_uri_; }
bool is_redirect(resp_data &data) {
if (data.status > 299 && data.status <= 399)
return true;
return false;
}
void set_conn_timeout(std::chrono::steady_clock::duration timeout_duration) {
conn_timeout_duration_ = timeout_duration;
}
void set_req_timeout(std::chrono::steady_clock::duration timeout_duration) {
req_timeout_duration_ = timeout_duration;
}
#ifdef CINATRA_ENABLE_SSL
void enable_sni_hostname(bool r) { need_set_sni_host_ = r; }
#endif
template <typename T, typename U>
friend class coro_io::client_pool;
private:
struct socket_t {
asio::ip::tcp::socket impl_;
std::atomic<bool> has_closed_ = true;
std::atomic<bool> is_timeout_ = false;
asio::streambuf head_buf_;
asio::streambuf chunked_buf_;
#ifdef CINATRA_ENABLE_SSL
std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>> ssl_stream_;
#endif
template <typename ioc_t>
socket_t(ioc_t &&ioc) : impl_(std::forward<ioc_t>(ioc)) {}
};
static bool is_ok(const resp_data &data) noexcept {
return data.net_err == std::error_code{};
}
template <typename S>
std::pair<bool, uri_t> handle_uri(resp_data &data, const S &uri) {
uri_t u;
if (!u.parse_from(uri.data())) {
CINATRA_LOG_WARNING
<< uri
<< ", the url is not right, maybe need to encode the url firstly";
data.net_err = std::make_error_code(std::errc::protocol_error);
data.status = 404;
return {false, {}};
}
// construct proxy request uri
construct_proxy_uri(u);
return {true, u};
}
void construct_proxy_uri(uri_t &u) {
if (!proxy_host_.empty() && !proxy_port_.empty()) {
if (!proxy_request_uri_.empty())
proxy_request_uri_.clear();
if (u.get_port() == "80") {
proxy_request_uri_.append("http://").append(u.get_host()).append(":80");
}
else if (u.get_port() == "443") {
proxy_request_uri_.append("https://")
.append(u.get_host())
.append(":443");
}
else {
// all be http
proxy_request_uri_.append("http://")
.append(u.get_host())
.append(":")
.append(u.get_port());
}
proxy_request_uri_.append(u.get_path());
u.path = std::string_view(proxy_request_uri_);
}
}
std::string build_request_header(
const uri_t &u, http_method method, const auto &ctx,
bool already_has_len = false,
std::unordered_map<std::string, std::string> headers = {}) {
std::string req_str(method_name(method));
req_str.append(" ").append(u.get_path());
if (!u.query.empty()) {
req_str.append("?").append(u.query);
}
if (!headers.empty()) {
req_headers_ = std::move(headers);
req_str.append(" HTTP/1.1\r\n");
}
else {
if (req_headers_.find("Host") == req_headers_.end()) {
req_str.append(" HTTP/1.1\r\nHost:").append(u.host).append("\r\n");
}
else {
req_str.append(" HTTP/1.1\r\n");
}
}
auto type_str = get_content_type_str(ctx.content_type);
if (!type_str.empty()) {
if (ctx.content_type == req_content_type::multipart) {
type_str.append(BOUNDARY);
}
req_headers_["Content-Type"] = std::move(type_str);
}
bool has_connection = false;
// add user headers
if (!req_headers_.empty()) {
for (auto &pair : req_headers_) {
if (pair.first == "Connection") {
has_connection = true;
}
req_str.append(pair.first)
.append(": ")
.append(pair.second)
.append("\r\n");
}
}
if (!has_connection) {
req_str.append("Connection: keep-alive\r\n");
}
if (!proxy_basic_auth_username_.empty() &&
!proxy_basic_auth_password_.empty()) {
std::string basic_auth_str = "Proxy-Authorization: Basic ";
std::string basic_base64_str = base64_encode(
proxy_basic_auth_username_ + ":" + proxy_basic_auth_password_);
req_str.append(basic_auth_str).append(basic_base64_str).append(CRCF);
}
if (!proxy_bearer_token_auth_token_.empty()) {
std::string bearer_token_str = "Proxy-Authorization: Bearer ";
req_str.append(bearer_token_str)
.append(proxy_bearer_token_auth_token_)
.append(CRCF);
}
if (!ctx.req_header.empty())
req_str.append(ctx.req_header);
size_t content_len = ctx.content.size();
bool should_add_len = false;
if (content_len > 0) {
should_add_len = true;
}
else {
if ((method == http_method::POST || method == http_method::PUT) &&
ctx.content_type != req_content_type::multipart) {
should_add_len = true;
}
}
if (req_headers_.find("Content-Length") != req_headers_.end()) {
should_add_len = false;
}
if (already_has_len) {
should_add_len = false;
}
if (should_add_len) {
char buf[32];
auto [ptr, ec] = std::to_chars(buf, buf + 32, content_len);
req_str.append("Content-Length: ")
.append(std::string_view(buf, ptr - buf))
.append("\r\n");
}
req_str.append("\r\n");
return req_str;
}
std::error_code handle_header(resp_data &data, http_parser &parser,
size_t header_size) {
// parse header
const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
int parse_ret = parser.parse_response(data_ptr, header_size, 0);
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
if (parse_failed_forever_) {
parse_ret = -1;
}
#endif
if (parse_ret < 0) [[unlikely]] {
head_buf_.consume(head_buf_.size());
return std::make_error_code(std::errc::protocol_error);
}
if (parser_.body_len() > max_http_body_len_ || parser_.body_len() < 0)
[[unlikely]] {
CINATRA_LOG_ERROR << "invalid http content length: "
<< parser_.body_len();
head_buf_.consume(head_buf_.size());
return std::make_error_code(std::errc::invalid_argument);
}
head_buf_.consume(header_size); // header size
data.resp_headers = parser.get_headers();
data.status = parser.status();
return {};
}
template <typename String>
async_simple::coro::Lazy<resp_data> handle_read(std::error_code &ec,
size_t &size,
bool &is_keep_alive,
req_context<String> ctx,
http_method method) {
resp_data data{};
do {
if (std::tie(ec, size) = co_await async_read_until(head_buf_, TWO_CRCF);
ec) {
break;
}
ec = handle_header(data, parser_, size);
if (ec) {
break;
}
is_keep_alive = parser_.keep_alive();
if (method == http_method::HEAD) {
co_return data;
}
bool is_out_buf = false;
bool is_ranges = parser_.is_resp_ranges();
if (is_ranges) {
is_keep_alive = true;
}
if (parser_.is_chunked()) {
out_buf_ = {};
is_keep_alive = true;
if (head_buf_.size() > 0) {
const char *data_ptr =
asio::buffer_cast<const char *>(head_buf_.data());
chunked_buf_.sputn(data_ptr, head_buf_.size());
head_buf_.consume(head_buf_.size());
}
ec = co_await handle_chunked(data, std::move(ctx));
break;
}
if (parser_.is_multipart()) {
out_buf_ = {};
is_keep_alive = true;
if (head_buf_.size() > 0) {
const char *data_ptr =
asio::buffer_cast<const char *>(head_buf_.data());
chunked_buf_.sputn(data_ptr, head_buf_.size());
head_buf_.consume(head_buf_.size());
}
ec = co_await handle_multipart(data, std::move(ctx));
break;
}
redirect_uri_.clear();
bool is_redirect = parser_.is_location();
if (is_redirect)
redirect_uri_ = parser_.get_header_value("Location");
if (!parser_.get_header_value("Content-Encoding").empty()) {
if (parser_.get_header_value("Content-Encoding").find("gzip") !=
std::string_view::npos)
encoding_type_ = content_encoding::gzip;
else if (parser_.get_header_value("Content-Encoding").find("deflate") !=
std::string_view::npos)
encoding_type_ = content_encoding::deflate;
else if (parser_.get_header_value("Content-Encoding").find("br") !=
std::string_view::npos)
encoding_type_ = content_encoding::br;
}
else {
encoding_type_ = content_encoding::none;
}
size_t content_len = (size_t)parser_.body_len();
#ifdef BENCHMARK_TEST
total_len_ = parser_.total_len();
#endif
is_out_buf = !out_buf_.empty();
if (is_out_buf) {
if (content_len > 0 && out_buf_.size() < content_len) {
out_buf_ = {};
is_out_buf = false;
}
}
if (content_len <= head_buf_.size()) {
// Now get entire content, additional data will discard.
// copy body.
if (content_len > 0) {
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
if (is_out_buf) {
memcpy(out_buf_.data(), data_ptr, content_len);
}
else {
detail::resize(body_, content_len);
memcpy(body_.data(), data_ptr, content_len);
}
head_buf_.consume(head_buf_.size());
}
co_await handle_entire_content(data, content_len, is_ranges, ctx);
break;
}
// read left part of content.
size_t part_size = head_buf_.size();
size_t size_to_read = content_len - part_size;
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
if (is_out_buf) {
memcpy(out_buf_.data(), data_ptr, part_size);
}
else {
detail::resize(body_, content_len);
memcpy(body_.data(), data_ptr, part_size);
}
head_buf_.consume(part_size);
if (is_out_buf) {
if (std::tie(ec, size) = co_await async_read(
asio::buffer(out_buf_.data() + part_size, size_to_read),
size_to_read);
ec) {
break;
}
}
else {
if (std::tie(ec, size) = co_await async_read(
asio::buffer(body_.data() + part_size, size_to_read),
size_to_read);
ec) {
break;
}
}
// Now get entire content, additional data will discard.
co_await handle_entire_content(data, content_len, is_ranges, ctx);
} while (0);
if (!resp_chunk_str_.empty()) {
data.resp_body =
std::string_view{resp_chunk_str_.data(), resp_chunk_str_.size()};
}
co_return data;
}
async_simple::coro::Lazy<void> handle_entire_content(resp_data &data,
size_t content_len,
bool is_ranges,
auto &ctx) {
if (content_len > 0) {
const char *data_ptr;
if (head_buf_.size() == 0) {
if (out_buf_.empty()) {
data_ptr = body_.data();
}
else {
data_ptr = out_buf_.data();
}
}
else {
data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
}
if (is_ranges) {
if (ctx.resp_body_stream) {
auto [ec, size] = co_await ctx.resp_body_stream->async_write(
{data_ptr, content_len});
if (ec) {
data.net_err = ec;
co_return;
}
}
}
std::string_view reply(data_ptr, content_len);
#ifdef CINATRA_ENABLE_GZIP
if (encoding_type_ == content_encoding::gzip) {
uncompressed_str_.clear();
bool r = gzip_codec::uncompress(reply, uncompressed_str_);
if (r)
data.resp_body = uncompressed_str_;
else
data.resp_body = reply;
}
else if (encoding_type_ == content_encoding::deflate) {
uncompressed_str_.clear();
bool r = gzip_codec::inflate(reply, uncompressed_str_);
if (r)
data.resp_body = uncompressed_str_;
else
data.resp_body = reply;
}
#endif
#if defined(CINATRA_ENABLE_BROTLI) && defined(CINATRA_ENABLE_GZIP)
else if (encoding_type_ == content_encoding::br)
#endif
#if defined(CINATRA_ENABLE_BROTLI) && !defined(CINATRA_ENABLE_GZIP)
if (encoding_type_ == content_encoding::br)
#endif
#ifdef CINATRA_ENABLE_BROTLI
{
uncompressed_str_.clear();
bool r = br_codec::brotli_decompress(reply, uncompressed_str_);
if (r)
data.resp_body = uncompressed_str_;
else
data.resp_body = reply;
}
#endif
#if defined(CINATRA_ENABLE_BROTLI) || defined(CINATRA_ENABLE_GZIP)
else
#endif
data.resp_body = reply;
head_buf_.consume(content_len);
}
data.eof = (head_buf_.size() == 0);
}
void handle_result(resp_data &data, std::error_code ec, bool is_keep_alive) {
if (ec) {
close_socket(*socket_);
data.net_err = ec;
data.status = 404;
#ifdef BENCHMARK_TEST
if (!stop_bench_) {
CINATRA_LOG_DEBUG << ec.message();
}
#endif
}
else {
if (!is_keep_alive) {
close_socket(*socket_);
}
}
}
template <typename String>
async_simple::coro::Lazy<std::error_code> handle_multipart(
resp_data &data, req_context<String> ctx) {
std::error_code ec{};
std::string boundary = std::string{parser_.get_boundary()};
multipart_reader_t multipart(this);
while (true) {
auto part_head = co_await multipart.read_part_head(boundary);
if (part_head.ec) {
co_return part_head.ec;
}
auto part_body = co_await multipart.read_part_body(boundary);
if (ctx.resp_body_stream) {
size_t size;
std::tie(ec, size) =
co_await ctx.resp_body_stream->async_write(part_body.data);
}
else {
resp_chunk_str_.append(part_body.data.data(), part_body.data.size());
}
if (part_body.ec) {
co_return part_body.ec;
}
if (part_body.eof) {
break;
}
}
co_return ec;
}
template <typename String>
async_simple::coro::Lazy<std::error_code> handle_chunked(
resp_data &data, req_context<String> ctx) {
std::error_code ec{};
size_t size = 0;
while (true) {
if (std::tie(ec, size) = co_await async_read_until(chunked_buf_, CRCF);
ec) {
break;
}
size_t buf_size = chunked_buf_.size();
size_t additional_size = buf_size - size;
const char *data_ptr =
asio::buffer_cast<const char *>(chunked_buf_.data());
std::string_view size_str(data_ptr, size - CRCF.size());
auto chunk_size = hex_to_int(size_str);
chunked_buf_.consume(size);
if (chunk_size < 0) {
CINATRA_LOG_DEBUG << "bad chunked size";
ec = asio::error::make_error_code(
asio::error::basic_errors::invalid_argument);
break;
}
if (additional_size < size_t(chunk_size + 2)) {
// not a complete chunk, read left chunk data.
size_t size_to_read = chunk_size + 2 - additional_size;
if (std::tie(ec, size) =
co_await async_read(chunked_buf_, size_to_read);
ec) {
break;
}
}
if (chunk_size == 0) {
// all finished, no more data
chunked_buf_.consume(chunked_buf_.size());
data.eof = true;
break;
}
data_ptr = asio::buffer_cast<const char *>(chunked_buf_.data());
if (ctx.resp_body_stream) {
std::tie(ec, size) = co_await ctx.resp_body_stream->async_write(
{data_ptr, (size_t)chunk_size});
}
else {
resp_chunk_str_.append(data_ptr, chunk_size);
}
chunked_buf_.consume(chunk_size + CRCF.size());
}
co_return ec;
}
async_simple::coro::Lazy<resp_data> connect(
const uri_t &u, std::vector<asio::ip::tcp::endpoint> *eps = nullptr) {
std::vector<asio::ip::tcp::endpoint> eps_tmp;
if (eps == nullptr) {
eps = &eps_tmp;
}
if (socket_->has_closed_) {
socket_->is_timeout_ = false;
host_ = proxy_host_.empty() ? u.get_host() : proxy_host_;
port_ = proxy_port_.empty() ? u.get_port() : proxy_port_;
if (eps->empty()) {
CINATRA_LOG_TRACE << "start resolve host: " << host_ << ":" << port_;
auto [ec, iter] = co_await coro_io::async_resolve(
&executor_wrapper_, socket_->impl_, host_, port_);
if (ec) {
co_return resp_data{ec, 404};
}
else {
asio::ip::tcp::resolver::iterator end;
while (iter != end) {
eps->push_back(iter->endpoint());
++iter;
}
if (eps->empty()) [[unlikely]] {
co_return resp_data{std::make_error_code(std::errc::not_connected),
404};
}
}
}
CINATRA_LOG_TRACE
<< "start connect to endpoint lists. total endpoint count:"
<< eps->size()
<< ", the first endpoint is: " << (*eps)[0].address().to_string()
<< ":" << std::to_string((*eps)[0].port());
std::error_code ec;
asio::ip::tcp::endpoint endpoint;
if (std::tie(ec, endpoint) = co_await coro_io::async_connect(
&executor_wrapper_, socket_->impl_, *eps);
ec) {
co_return resp_data{ec, 404};
}
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
if (connect_timeout_forever_) {
socket_->is_timeout_ = true;
}
#endif
if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
co_return resp_data{ec, 404};
}
if (enable_tcp_no_delay_) {
socket_->impl_.set_option(asio::ip::tcp::no_delay(true), ec);
if (ec) {
co_return resp_data{ec, 404};
}
}
if (u.is_ssl) {
#ifdef CINATRA_ENABLE_SSL
if (!has_init_ssl_) {
size_t pos = u.host.find("www.");
std::string host;
if (pos != std::string_view::npos) {
host = std::string{u.host.substr(pos + 4)};
}
else {
host = std::string{u.host};
}
bool r = init_ssl(asio::ssl::verify_none, "", host);
if (!r) {
co_return resp_data{
std::make_error_code(std::errc::invalid_argument), 404};
}
}
#endif
if (ec = co_await handle_shake(); ec) {
co_return resp_data{ec, 404};
}
}
socket_->has_closed_ = false;
CINATRA_LOG_TRACE << "connect to endpoint: "
<< endpoint.address().to_string() << ":"
<< std::to_string(endpoint.port()) << " successfully";
}
co_return resp_data{};
}
size_t multipart_content_len() {
size_t content_len = 0;
for (auto &[key, part] : form_data_) {
content_len += 75;
content_len += key.size() + 1;
if (!part.filename.empty()) {
content_len += (12 + part.filename.size() + 1);
auto ext = std::filesystem::path(part.filename).extension().string();
if (auto it = g_content_type_map.find(ext);
it != g_content_type_map.end()) {
content_len += (14 + it->second.size());
}
}
content_len += 4;
content_len += (part.size + 2);
}
content_len += (6 + BOUNDARY.size());
return content_len;
}
async_simple::coro::Lazy<resp_data> send_single_part(
const std::string &key, const multipart_t &part) {
std::string part_content_head;
part_content_head.append("--").append(BOUNDARY).append(CRCF);
part_content_head.append("Content-Disposition: form-data; name=\"");
part_content_head.append(key).append("\"");
bool is_file = !part.filename.empty();
std::string short_name =
std::filesystem::path(part.filename).filename().string();
if (is_file) {
part_content_head.append("; filename=\"")
.append(short_name)
.append("\"")
.append(CRCF);
auto ext = std::filesystem::path(short_name).extension().string();
if (auto it = g_content_type_map.find(ext);
it != g_content_type_map.end()) {
part_content_head.append("Content-Type: ")
.append(it->second)
.append(CRCF);
}
part_content_head.append(CRCF);
}
else {
part_content_head.append(TWO_CRCF);
}
if (auto [ec, size] = co_await async_write(asio::buffer(part_content_head));
ec) {
co_return resp_data{ec, 404};
}
if (is_file) {
coro_io::coro_file file{};
file.open(part.filename, std::ios::in);
assert(file.is_open());
std::string file_data;
detail::resize(file_data, max_single_part_size_);
while (true) {
auto [rd_ec, rd_size] =
co_await file.async_read(file_data.data(), file_data.size());
if (auto [ec, size] =
co_await async_write(asio::buffer(file_data.data(), rd_size));
ec) {
co_return resp_data{ec, 404};
}
if (file.eof()) {
if (auto [ec, size] = co_await async_write(asio::buffer(CRCF)); ec) {
co_return resp_data{ec, 404};
}
break;
}
}
}
else {
std::array<asio::const_buffer, 2> arr{asio::buffer(part.content),
asio::buffer(CRCF)};
if (auto [ec, size] = co_await async_write(arr); ec) {
co_return resp_data{ec, 404};
}
}
co_return resp_data{{}, 200};
}
async_simple::coro::Lazy<resp_data> async_read_ws() {
resp_data data{};
head_buf_.consume(head_buf_.size());
std::shared_ptr sock = socket_;
asio::streambuf &read_buf = sock->head_buf_;
bool has_init_ssl = false;
#ifdef CINATRA_ENABLE_SSL
has_init_ssl = has_init_ssl_;
#endif
websocket ws{};
while (true) {
if (auto [ec, _] = co_await async_read_ws(
sock, read_buf, ws.left_header_len(), has_init_ssl);
ec) {
if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
}
data.net_err = ec;
data.status = 404;
if (sock->has_closed_) {
co_return data;
}
close_socket(*sock);
co_return data;
}
const char *data_ptr = asio::buffer_cast<const char *>(read_buf.data());
auto ret = ws.parse_header(data_ptr, read_buf.size(), false);
if (ret == ws_header_status::incomplete) {
continue;
}
else if (ret == ws_header_status::error) {
data.net_err = std::make_error_code(std::errc::protocol_error);
data.status = 404;
close_socket(*sock);
co_return data;
}
frame_header *header = (frame_header *)data_ptr;
bool is_close_frame = header->opcode == opcode::close;
read_buf.consume(read_buf.size());
size_t payload_len = ws.payload_length();
if (auto [ec, size] =
co_await async_read_ws(sock, read_buf, payload_len, has_init_ssl);
ec) {
data.net_err = ec;
data.status = 404;
close_socket(*sock);
co_return data;
}
data_ptr = asio::buffer_cast<const char *>(read_buf.data());
#ifdef CINATRA_ENABLE_GZIP
if (is_server_support_ws_deflate_ && enable_ws_deflate_) {
inflate_str_.clear();
if (!cinatra::gzip_codec::inflate({data_ptr, payload_len},
inflate_str_)) {
CINATRA_LOG_ERROR << "uncompuress data error";
data.status = 404;
data.net_err = std::make_error_code(std::errc::protocol_error);
co_return data;
}
data_ptr = inflate_str_.data();
payload_len = inflate_str_.length();
}
#endif
if (is_close_frame) {
if (payload_len >= 2) {
payload_len -= 2;
data_ptr += sizeof(uint16_t);
}
}
data.status = 200;
data.resp_body = {data_ptr, payload_len};
read_buf.consume(read_buf.size());
if (is_close_frame) {
std::string reason = "close";
auto close_str = ws.format_close_payload(close_code::normal,
reason.data(), reason.size());
auto span = std::span<char>(close_str);
auto encode_header = ws.encode_frame(span, opcode::close, true);
std::vector<asio::const_buffer> buffers{asio::buffer(encode_header),
asio::buffer(reason)};
co_await async_write_ws(sock, buffers, has_init_ssl);
close_socket(*sock);
data.net_err = asio::error::eof;
data.status = 404;
co_return data;
}
co_return data;
}
}
template <typename AsioBuffer>
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read_ws(
auto sock, AsioBuffer &&buffer, size_t size_to_read,
bool has_init_ssl = false) noexcept {
#ifdef CINATRA_ENABLE_SSL
if (has_init_ssl) {
return coro_io::async_read(*sock->ssl_stream_, buffer, size_to_read);
}
else {
#endif
return coro_io::async_read(sock->impl_, buffer, size_to_read);
#ifdef CINATRA_ENABLE_SSL
}
#endif
}
template <typename AsioBuffer>
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_write_ws(
auto sock, AsioBuffer &&buffer, bool has_init_ssl = false) {
#ifdef CINATRA_ENABLE_SSL
if (has_init_ssl) {
return coro_io::async_write(*sock->ssl_stream_, buffer);
}
else {
#endif
return coro_io::async_write(sock->impl_, buffer);
#ifdef CINATRA_ENABLE_SSL
}
#endif
}
template <typename AsioBuffer>
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read(
AsioBuffer &&buffer, size_t size_to_read) noexcept {
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
if (read_failed_forever_) {
return async_read_failed();
}
#endif
#ifdef CINATRA_ENABLE_SSL
if (has_init_ssl_) {
return coro_io::async_read(*socket_->ssl_stream_, buffer, size_to_read);
}
else {
#endif
return coro_io::async_read(socket_->impl_, buffer, size_to_read);
#ifdef CINATRA_ENABLE_SSL
}
#endif
}
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
async_simple::coro::Lazy<std::pair<std::error_code, size_t>>
async_write_failed() {
co_return std::make_pair(std::make_error_code(std::errc::io_error), 0);
}
async_simple::coro::Lazy<std::pair<std::error_code, size_t>>
async_read_failed() {
co_return std::make_pair(std::make_error_code(std::errc::io_error), 0);
}
#endif
template <typename AsioBuffer>
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_write(
AsioBuffer &&buffer) {
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
if (write_failed_forever_) {
return async_write_failed();
}
#endif
#ifdef CINATRA_ENABLE_SSL
if (has_init_ssl_) {
return coro_io::async_write(*socket_->ssl_stream_, buffer);
}
else {
#endif
return coro_io::async_write(socket_->impl_, buffer);
#ifdef CINATRA_ENABLE_SSL
}
#endif
}
template <typename AsioBuffer>
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read_until(
AsioBuffer &buffer, asio::string_view delim) noexcept {
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
if (read_failed_forever_) {
return async_read_failed();
}
#endif
#ifdef CINATRA_ENABLE_SSL
if (has_init_ssl_) {
return coro_io::async_read_until(*socket_->ssl_stream_, buffer, delim);
}
else {
#endif
return coro_io::async_read_until(socket_->impl_, buffer, delim);
#ifdef CINATRA_ENABLE_SSL
}
#endif
}
static void close_socket(socket_t &socket) {
std::error_code ec;
socket.impl_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
socket.impl_.close(ec);
socket.has_closed_ = true;
}
async_simple::coro::Lazy<bool> timeout(
auto &timer, std::chrono::steady_clock::duration duration,
std::string msg) {
auto watcher = std::weak_ptr(socket_);
timer.expires_after(duration);
auto is_timeout = co_await timer.async_await();
if (!is_timeout) {
co_return false;
}
if (auto socket = watcher.lock(); socket) {
socket_->is_timeout_ = true;
CINATRA_LOG_WARNING << msg << " timeout";
close_socket(*socket_);
}
co_return true;
}
template <typename S>
bool has_schema(const S &url) {
size_t pos_http = url.find("http://");
size_t pos_https = url.find("https://");
size_t pos_ws = url.find("ws://");
size_t pos_wss = url.find("wss://");
bool has_http_scheme =
((pos_http != std::string::npos) && pos_http == 0) ||
((pos_https != std::string::npos) && pos_https == 0) ||
((pos_ws != std::string::npos) && pos_ws == 0) ||
((pos_wss != std::string::npos) && pos_wss == 0);
return has_http_scheme;
}
friend class multipart_reader_t<coro_http_client>;
http_parser parser_;
coro_io::ExecutorWrapper<> executor_wrapper_;
coro_io::period_timer timer_;
std::shared_ptr<socket_t> socket_;
asio::streambuf &head_buf_;
asio::streambuf &chunked_buf_;
std::string body_;
std::unordered_map<std::string, std::string> req_headers_;
std::string proxy_request_uri_ = "";
std::string proxy_host_;
std::string proxy_port_;
std::string proxy_basic_auth_username_;
std::string proxy_basic_auth_password_;
std::string proxy_bearer_token_auth_token_;
std::map<std::string, multipart_t> form_data_;
size_t max_single_part_size_ = 1024 * 1024;
std::string ws_sec_key_;
std::string host_;
std::string port_;
#ifdef CINATRA_ENABLE_SSL
std::unique_ptr<asio::ssl::context> ssl_ctx_ = nullptr;
bool has_init_ssl_ = false;
bool is_ssl_schema_ = false;
bool need_set_sni_host_ = true;
#endif
std::string redirect_uri_;
bool enable_follow_redirect_ = false;
bool enable_timeout_ = false;
std::chrono::steady_clock::duration conn_timeout_duration_ =
std::chrono::seconds(30);
std::chrono::steady_clock::duration req_timeout_duration_ =
std::chrono::seconds(60);
bool enable_tcp_no_delay_ = true;
std::string resp_chunk_str_;
std::span<char> out_buf_;
bool should_reset_ = false;
config config_;
bool enable_ws_deflate_ = false;
#ifdef CINATRA_ENABLE_GZIP
bool is_server_support_ws_deflate_ = false;
std::string inflate_str_;
#endif
content_encoding encoding_type_ = content_encoding::none;
int64_t max_http_body_len_ = MAX_HTTP_BODY_SIZE;
#if defined(CINATRA_ENABLE_BROTLI) || defined(CINATRA_ENABLE_GZIP)
std::string uncompressed_str_;
#endif
#ifdef BENCHMARK_TEST
bool stop_bench_ = false;
size_t total_len_ = 0;
#endif
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
public:
bool write_failed_forever_ = false;
bool connect_timeout_forever_ = false;
bool parse_failed_forever_ = false;
bool read_failed_forever_ = false;
bool write_header_timeout_ = false;
bool write_payload_timeout_ = false;
bool read_timeout_ = false;
#endif
};
} // namespace cinatra