include/ylt/coro_io/client_pool.hpp (521 lines of code) (raw):

/* * Copyright (c) 2023, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <async_simple/Executor.h> #include <async_simple/Promise.h> #include <async_simple/Try.h> #include <async_simple/Unit.h> #include <async_simple/coro/Lazy.h> #include <async_simple/coro/Sleep.h> #include <async_simple/coro/SpinLock.h> #include <asio/io_context.hpp> #include <asio/steady_timer.hpp> #include <atomic> #include <chrono> #include <cstddef> #include <cstdint> #include <deque> #include <exception> #include <memory> #include <mutex> #include <random> #include <shared_mutex> #include <string_view> #include <system_error> #include <thread> #include <type_traits> #include <unordered_map> #include <utility> #include <ylt/util/expected.hpp> #include "async_simple/Common.h" #include "async_simple/coro/Collect.h" #include "coro_io.hpp" #include "detail/client_queue.hpp" #include "io_context_pool.hpp" #include "ylt/easylog.hpp" #include "ylt/util/atomic_shared_ptr.hpp" namespace coro_io { template <typename client_t, typename io_context_pool_t> class client_pools; template <typename, typename> class load_balancer; template <typename client_t, typename io_context_pool_t = coro_io::io_context_pool> class client_pool : public std::enable_shared_from_this< client_pool<client_t, io_context_pool_t>> { using client_pools_t = client_pools<client_t, io_context_pool_t>; static async_simple::coro::Lazy<void> collect_idle_timeout_client( std::weak_ptr<client_pool> self_weak, coro_io::detail::client_queue<std::unique_ptr<client_t>>& clients, std::chrono::milliseconds sleep_time, std::size_t clear_cnt) { std::shared_ptr<client_pool> self = self_weak.lock(); if (self == nullptr) { co_return; } while (true) { clients.reselect(); self = nullptr; co_await coro_io::sleep_for(sleep_time); if ((self = self_weak.lock()) == nullptr) { break; } while (true) { ELOG_TRACE << "start collect timeout client of pool{" << self->host_name_ << "}, now client count: " << clients.size(); std::size_t is_all_cleared = clients.clear_old(clear_cnt); ELOG_TRACE << "finish collect timeout client of pool{" << self->host_name_ << "}, now client cnt: " << clients.size(); if (is_all_cleared != 0) [[unlikely]] { try { co_await async_simple::coro::Yield{}; } catch (std::exception& e) { ELOG_ERROR << "unexcepted yield exception: " << e.what(); } } else { break; } } --clients.collecter_cnt_; if (clients.size() == 0) { break; } std::size_t expected = 0; if (!clients.collecter_cnt_.compare_exchange_strong(expected, 1)) break; } co_return; } static auto rand_time(std::chrono::milliseconds ms) { static thread_local std::default_random_engine r; std::uniform_real_distribution e(1.0f, 1.2f); return std::chrono::milliseconds{static_cast<long>(e(r) * ms.count())}; } static async_simple::coro::Lazy<std::pair<bool, std::chrono::milliseconds>> reconnect_impl(std::unique_ptr<client_t>& client, std::shared_ptr<client_pool>& self) { auto pre_time_point = std::chrono::steady_clock::now(); auto dns_cache_update_duration = self->pool_config_.dns_cache_update_duration; std::vector<asio::ip::tcp::endpoint>* eps_raw_ptr = nullptr; std::shared_ptr<std::vector<asio::ip::tcp::endpoint>> eps_ptr; std::vector<asio::ip::tcp::endpoint> eps; uint64_t old_tp; if (dns_cache_update_duration.count() >= 0) { eps_ptr = self->eps_.load(std::memory_order_acquire); eps_raw_ptr = eps_ptr.get(); old_tp = self->timepoint_.load(std::memory_order_acquire); auto old_time_point = std::chrono::steady_clock::time_point{ std::chrono::steady_clock::duration{old_tp}}; // check if no dns cache, or cache outdated if (eps_raw_ptr->empty() || (pre_time_point - old_time_point) > dns_cache_update_duration) [[unlikely]] { // start resolve, store result to eps eps_raw_ptr = &eps; } // else, we can used cached eps } auto result = co_await client->connect(self->host_name_, eps_raw_ptr); bool ok = client_t::is_ok(result); if (dns_cache_update_duration.count() >= 0) { if ((!ok && (eps_raw_ptr != &eps)) // use cache but request failed, clear cache || (ok && (eps_raw_ptr == &eps))) // don't have cache request ok, set cache { if (self->timepoint_.compare_exchange_strong( old_tp, pre_time_point.time_since_epoch().count(), std::memory_order_release)) { auto new_eps_ptr = std::make_shared<std::vector<asio::ip::tcp::endpoint>>( std::move(eps)); self->eps_.store(std::move(new_eps_ptr), std::memory_order_release); } } } auto post_time_point = std::chrono::steady_clock::now(); auto cost_time = std::chrono::duration_cast<std::chrono::milliseconds>( post_time_point - pre_time_point); ELOG_TRACE << "reconnect client{" << client.get() << "}" << "is success:" << ok << ", cost time: " << cost_time / std::chrono::milliseconds{1} << "ms"; co_return std::pair{ok, cost_time}; } static async_simple::coro::Lazy<void> reconnect( std::unique_ptr<client_t>& client, std::weak_ptr<client_pool> watcher) { using namespace std::chrono_literals; std::shared_ptr<client_pool> self = watcher.lock(); uint32_t i = UINT32_MAX; // (at least connect once) do { ELOG_TRACE << "try to reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" << client->get_port() << "}, try count:" << i + 1 << "max retry limit:" << self->pool_config_.connect_retry_count; auto [ok, cost_time] = co_await reconnect_impl(client, self); if (ok) { ELOG_TRACE << "reconnect client{" << client.get() << "} success"; co_return; } ELOG_TRACE << "reconnect client{" << client.get() << "} failed. If client close:{" << client->has_closed() << "}"; auto wait_time = rand_time( (self->pool_config_.reconnect_wait_time - cost_time) / 1ms * 1ms); self = nullptr; if (wait_time.count() > 0) co_await coro_io::sleep_for(wait_time, &client->get_executor()); self = watcher.lock(); ++i; } while (i < self->pool_config_.connect_retry_count); ELOG_WARN << "reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" << client->get_port() << "} out of max limit, stop retry. connect failed"; alive_detect(client->get_config(), std::move(self)).start([](auto&&) { }); client = nullptr; } static async_simple::coro::Lazy<void> alive_detect( const typename client_t::config& client_config, std::weak_ptr<client_pool> watcher) { std::shared_ptr<client_pool> self = watcher.lock(); using namespace std::chrono_literals; if (self && self->pool_config_.host_alive_detect_duration.count() != 0 && self->free_client_count() == 0) { bool expected = true; if (!self->is_alive_.compare_exchange_strong( expected, false)) { // other alive detect coroutine is running. co_return; } if (self->free_client_count() > 0) { // recheck for multi-thread self->is_alive_ = true; co_return; } auto executor = self->io_context_pool_.get_executor(); auto client = std::make_unique<client_t>(*executor); if (!client->init_config(client_config)) AS_UNLIKELY { ELOG_ERROR << "Init client config failed in host alive detect. That " "is not expected."; co_return; } while (true) { auto [ok, cost_time] = co_await reconnect_impl(client, self); if (ok) { ELOG_TRACE << "reconnect client{" << client.get() << "} success. stop alive detect."; self->collect_free_client(std::move(client)); self->is_alive_ = true; /*if client close(), we still mark it as alive*/ co_return; } if (self->is_alive_) { ELOG_TRACE << "client pool is aliving, stop connect client {" << client.get() << "} for alive detect"; co_return; } ELOG_TRACE << "reconnect client{" << client.get() << "} failed. continue alive detect."; auto wait_time = rand_time( (self->pool_config_.host_alive_detect_duration - cost_time) / 1ms * 1ms); self = nullptr; if (wait_time.count() > 0) { co_await coro_io::sleep_for(wait_time, &client->get_executor()); } self = watcher.lock(); if (self->is_alive_) { ELOG_TRACE << "client pool is aliving, stop connect client {" << client.get() << "} for alive detect"; co_return; } } } } async_simple::coro::Lazy<std::unique_ptr<client_t>> get_client( const typename client_t::config& client_config) { std::unique_ptr<client_t> client; free_clients_.try_dequeue(client); if (!client) { short_connect_clients_.try_dequeue(client); } if (client == nullptr) { std::unique_ptr<client_t> cli; auto executor = io_context_pool_.get_executor(); client = std::make_unique<client_t>(*executor); if (!client->init_config(client_config)) AS_UNLIKELY { ELOG_ERROR << "init client config failed."; co_return nullptr; } co_await reconnect(client, this->weak_from_this()); } else { ELOG_TRACE << "get free client{" << client.get() << "}. from queue"; } co_return std::move(client); } void enqueue( coro_io::detail::client_queue<std::unique_ptr<client_t>>& clients, std::unique_ptr<client_t> client, std::chrono::milliseconds collect_time) { if (clients.enqueue(std::move(client)) == 1) { std::size_t expected = 0; if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) { ELOG_TRACE << "start timeout client collecter of client_pool{" << host_name_ << "}"; collect_idle_timeout_client( this->weak_from_this(), clients, (std::max)(collect_time, std::chrono::milliseconds{50}), pool_config_.idle_queue_per_max_clear_count) .directlyStart([](auto&&) { },coro_io::get_global_executor()); } } } void collect_free_client(std::unique_ptr<client_t> client) { if (!client->has_closed()) { if (free_clients_.size() < pool_config_.max_connection) { ELOG_TRACE << "collect free client{" << client.get() << "} enqueue"; enqueue(free_clients_, std::move(client), pool_config_.idle_timeout); } else { ELOG_TRACE << "out of max connection limit <<" << pool_config_.max_connection << ", collect free client{" << client.get() << "} enqueue short connect queue"; enqueue(short_connect_clients_, std::move(client), pool_config_.short_connect_idle_timeout); } is_alive_ = true; } else { ELOG_TRACE << "client{" << client.get() << "} is closed. we won't collect it"; } return; }; template <typename T> struct lazy_hacker { using type = void; }; template <typename T> struct lazy_hacker<async_simple::coro::Lazy<T>> { using type = T; }; template <typename T> using return_type = ylt::expected<typename lazy_hacker<decltype(std::declval<T>()( std::declval<client_t&>()))>::type, std::errc>; template <typename T> using return_type_with_host = ylt::expected<typename lazy_hacker<decltype(std::declval<T>()( std::declval<client_t&>(), std::string_view{}))>::type, std::errc>; public: struct pool_config { uint32_t max_connection = 100; uint32_t connect_retry_count = 3; uint32_t idle_queue_per_max_clear_count = 1000; std::chrono::milliseconds reconnect_wait_time{1000}; std::chrono::milliseconds idle_timeout{30000}; std::chrono::milliseconds short_connect_idle_timeout{1000}; std::chrono::milliseconds host_alive_detect_duration{ 30000}; /* zero means wont detect */ typename client_t::config client_config; std::chrono::seconds dns_cache_update_duration{5 * 60}; // 5mins }; private: struct private_construct_token {}; public: static std::shared_ptr<client_pool> create( std::string_view host_name, const pool_config& pool_config = {}, io_context_pool_t& io_context_pool = coro_io::g_io_context_pool()) { return std::make_shared<client_pool>(private_construct_token{}, host_name, pool_config, io_context_pool); } client_pool(private_construct_token t, std::string_view host_name, const pool_config& pool_config, io_context_pool_t& io_context_pool) : host_name_(host_name), pool_config_(pool_config), io_context_pool_(io_context_pool), free_clients_(pool_config.max_connection), eps_(std::make_shared<std::vector<asio::ip::tcp::endpoint>>()){}; client_pool(private_construct_token t, client_pools_t* pools_manager_, std::string_view host_name, const pool_config& pool_config, io_context_pool_t& io_context_pool) : pools_manager_(pools_manager_), host_name_(host_name), pool_config_(pool_config), io_context_pool_(io_context_pool), free_clients_(pool_config.max_connection), eps_(std::make_shared<std::vector<asio::ip::tcp::endpoint>>()){}; template <typename T> async_simple::coro::Lazy<return_type<T>> send_request( T op, typename client_t::config& client_config) { // return type: Lazy<expected<T::returnType,std::errc>> ELOG_TRACE << "try send request to " << host_name_; auto client = co_await get_client(client_config); if (!client) { ELOG_WARN << "send request to " << host_name_ << " failed. connection refused."; co_return return_type<T>{ylt::unexpect, std::errc::connection_refused}; } if constexpr (std::is_same_v<typename return_type<T>::value_type, void>) { co_await op(*client); collect_free_client(std::move(client)); co_return return_type<T>{}; } else { auto ret = co_await op(*client); collect_free_client(std::move(client)); co_return std::move(ret); } } template <typename T> decltype(auto) send_request(T op) { return send_request(std::move(op), pool_config_.client_config); } /** * @brief approx connection of client pools * * @return std::size_t */ std::size_t free_client_count() const noexcept { return free_clients_.size() + short_connect_clients_.size(); } /** * @brief if host may not useable now. * * @return bool */ bool is_alive() const noexcept { return is_alive_; } /** * @brief approx connection of client pools * * @return std::size_t */ std::size_t size() const noexcept { return free_client_count(); } std::string_view get_host_name() const noexcept { return host_name_; } std::shared_ptr<std::vector<asio::ip::tcp::endpoint>> get_remote_endpoints() const noexcept { return eps_.load(std::memory_order_acquire); } private: template <typename, typename> friend class client_pools; template <typename, typename> friend class load_balancer; template <typename T> async_simple::coro::Lazy<return_type_with_host<T>> send_request( T op, std::string_view endpoint, typename client_t::config& client_config) { // return type: Lazy<expected<T::returnType,std::errc>> ELOG_TRACE << "try send request to " << endpoint; auto client = co_await get_client(client_config); if (!client) { ELOG_WARN << "send request to " << endpoint << " failed. connection refused."; co_return return_type_with_host<T>{ylt::unexpect, std::errc::connection_refused}; } if constexpr (std::is_same_v<typename return_type_with_host<T>::value_type, void>) { co_await op(*client, endpoint); collect_free_client(std::move(client)); co_return return_type_with_host<T>{}; } else { auto ret = co_await op(*client, endpoint); collect_free_client(std::move(client)); co_return std::move(ret); } } template <typename T> decltype(auto) send_request(T op, std::string_view sv) { return send_request(std::move(op), sv, pool_config_.client_config); } coro_io::detail::client_queue<std::unique_ptr<client_t>> free_clients_; coro_io::detail::client_queue<std::unique_ptr<client_t>> short_connect_clients_; client_pools_t* pools_manager_ = nullptr; async_simple::Promise<async_simple::Unit> idle_timeout_waiter; std::string host_name_; pool_config pool_config_; io_context_pool_t& io_context_pool_; std::atomic<bool> is_alive_ = true; std::atomic<uint64_t> timepoint_; ylt::util::atomic_shared_ptr<std::vector<asio::ip::tcp::endpoint>> eps_; }; template <typename client_t, typename io_context_pool_t = coro_io::io_context_pool> class client_pools { using client_pool_t = client_pool<client_t, io_context_pool_t>; public: client_pools( const typename client_pool_t::pool_config& pool_config = {}, io_context_pool_t& io_context_pool = coro_io::g_io_context_pool()) : io_context_pool_(io_context_pool), default_pool_config_(pool_config) {} auto send_request(std::string_view host_name, auto op) -> decltype(std::declval<client_pool_t>().send_request(std::move(op))) { auto pool = get_client_pool(host_name, default_pool_config_); auto ret = co_await pool->send_request(std::move(op)); co_return ret; } auto send_request(std::string_view host_name, typename client_pool_t::pool_config& pool_config, auto op) -> decltype(std::declval<client_pool_t>().send_request(std::move(op))) { auto pool = get_client_pool(host_name, pool_config); auto ret = co_await pool->send_request(std::move(op)); co_return ret; } auto at(std::string_view host_name) { return get_client_pool(host_name, default_pool_config_); } auto at(std::string_view host_name, const typename client_pool_t::pool_config& pool_config) { return get_client_pool(host_name, pool_config); } auto operator[](std::string_view host_name) { return at(host_name); } auto get_io_context_pool() { return io_context_pool_; } private: std::shared_ptr<client_pool_t> get_client_pool( std::string_view host_name, const typename client_pool_t::pool_config& pool_config) { decltype(client_pool_manager_.end()) iter; bool has_inserted; { #ifdef __cpp_lib_generic_unordered_lookup std::shared_lock shared_lock{mutex_}; iter = client_pool_manager_.find(host_name); #else std::string host_name_copy = std::string{host_name}; std::shared_lock shared_lock{mutex_}; iter = client_pool_manager_.find(host_name_copy); #endif if (iter == client_pool_manager_.end()) { shared_lock.unlock(); auto pool = std::make_shared<client_pool_t>( typename client_pool_t::private_construct_token{}, this, host_name, pool_config, io_context_pool_); { std::lock_guard lock{mutex_}; std::tie(iter, has_inserted) = client_pool_manager_.emplace(host_name, nullptr); if (has_inserted) { iter->second = pool; } } } return iter->second; } } struct string_hash { using hash_type = std::hash<std::string_view>; using is_transparent = void; std::size_t operator()(std::string_view str) const { return hash_type{}(str); } std::size_t operator()(std::string const& str) const { return hash_type{}(str); } }; typename client_pool_t::pool_config default_pool_config_{}; std::unordered_map<std::string, std::shared_ptr<client_pool_t>, string_hash, std::equal_to<>> client_pool_manager_; io_context_pool_t& io_context_pool_; std::shared_mutex mutex_; }; template <typename client_t, typename io_context_pool_t = coro_io::io_context_pool> inline client_pools<client_t, io_context_pool_t>& g_clients_pool( const typename client_pool<client_t, io_context_pool_t>::pool_config& pool_config = {}, io_context_pool_t& io_context_pool = coro_io::g_io_context_pool()) { static client_pools<client_t, io_context_pool_t> _g_clients_pool( pool_config, io_context_pool); return _g_clients_pool; } } // namespace coro_io