include/photon/common/throttle.h (117 lines of code) (raw):

#pragma once #include <photon/thread/thread.h> #include <photon/common/utility.h> namespace photon { class throttle { protected: photon::semaphore sem; uint64_t last_retrieve = 0; uint64_t m_limit = -1UL; uint64_t m_limit_per_slice = -1UL; uint64_t m_time_window; uint64_t m_time_window_per_slice; uint64_t m_slice_num; void try_signal() { auto duration = photon::now - last_retrieve; if (duration > m_time_window) duration = m_time_window; if (duration >= m_time_window_per_slice) { auto free = m_limit_per_slice * (duration / m_time_window_per_slice); auto current = photon::sat_sub(m_limit, sem.count()); if (current < free) { free = current; } sem.signal(free); last_retrieve = photon::now; } } public: /** * @param limit -1UL means no limit, 0 means lowest speed (hang) */ explicit throttle(uint64_t limit, uint64_t time_window = 1000UL * 1000, uint64_t slice = 100) : m_slice_num(slice) { update(limit); // Equals to DIV_ROUND_UP m_time_window_per_slice = photon::sat_add(time_window, (m_slice_num - 1)) / m_slice_num; m_time_window = m_time_window_per_slice * m_slice_num; for (auto& each: m_starving_slice_num) each = 0; int i = 0; for (auto& each: m_starving_slice_percent) each = get_starving_percent(Priority(i++)); sem.signal(m_limit); } enum class Priority { High, Medium, Low, NumPriorities, }; void update(uint64_t limit) { // Equals to DIV_ROUND_UP m_limit_per_slice = photon::sat_add(limit, (m_slice_num - 1)) / m_slice_num; m_limit = m_limit_per_slice * m_slice_num; } int consume(uint64_t amount, Priority prio = Priority::High) { uint64_t fulfil_percent = get_fulfill_percent(prio); uint64_t starving_percent = m_starving_slice_percent[int(prio)]; // TODO: Handle the situation when throttle limit is extremely low assert(amount < m_limit); int ret = -1; int err = ETIMEDOUT; do { try_signal(); auto& starving_slice_num = m_starving_slice_num[int(prio)]; if (starving_slice_num * 100 > m_slice_num * starving_percent) { // Avoid the high priority requests consumed all tokens, // and make the low priority ones starving. starving_slice_num = 0; goto break_starving; } if (sem.count() * 100 < m_limit * fulfil_percent) { // Request are fulfilled only if they saw enough percent of tokens, // otherwise wait a `time_window_per_slice`. int sleep_ret = photon::thread_usleep(m_time_window_per_slice); if (sleep_ret != 0) { // Interrupted, just return return -1; } starving_slice_num++; continue; } break_starving: ret = sem.wait_interruptible(amount, m_time_window_per_slice); err = errno; } while (ret < 0 && err == ETIMEDOUT); if (ret < 0) { errno = err; return ret; } return 0; } int try_consume(uint64_t amount) { try_signal(); return sem.wait(amount, 0); } void restore(uint64_t amount) { auto free = amount; auto current = photon::sat_sub(m_limit, sem.count()); if (current < free) free = current; sem.signal(free); } protected: // High priority is actually realtime static uint64_t get_fulfill_percent(Priority prio) { assert(prio < Priority::NumPriorities); switch (prio) { case Priority::Low: return 60; case Priority::Medium: return 30; case Priority::High: default: return 0; } } static uint64_t get_starving_percent(Priority prio) { assert(prio < Priority::NumPriorities); switch (prio) { case Priority::Low: return 20; case Priority::Medium: return 10; case Priority::High: default: return 0; } } uint64_t m_starving_slice_num[int(Priority::NumPriorities)] = {}; uint64_t m_starving_slice_percent[int(Priority::NumPriorities)] = {}; }; } // namespace photon