include/ylt/coro_io/rate_limiter.hpp (131 lines of code) (raw):
/*
* Copyright (c) 2023, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <async_simple/coro/Lazy.h>
#include <async_simple/coro/SpinLock.h>
#include <async_simple/coro/SyncAwait.h>
#include <algorithm>
#include <chrono>
#include <iostream>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/easylog.hpp>
namespace coro_io {
class rate_limiter {
public:
async_simple::coro::Lazy<std::chrono::milliseconds> acquire(int permits) {
std::chrono::milliseconds wait_mills;
{
auto scope = co_await this->lock_.coScopedLock();
wait_mills = reserve_and_get_wait_length(permits, current_time_mills());
}
co_await coro_io::sleep_for(wait_mills);
co_return wait_mills;
}
async_simple::coro::Lazy<void> set_rate(double permitsPerSecond) {
auto scope = co_await this->lock_.coScopedLock();
do_set_rate(permitsPerSecond, current_time_mills());
}
virtual ~rate_limiter() {}
protected:
virtual void do_set_rate(
double permitsPerSecond,
std::chrono::steady_clock::time_point now_micros) = 0;
virtual std::chrono::steady_clock::time_point reserve_earliest_available(
int permits, std::chrono::steady_clock::time_point now_micros) = 0;
std::chrono::steady_clock::time_point current_time_mills() {
return std::chrono::steady_clock::now();
}
private:
std::chrono::milliseconds reserve_and_get_wait_length(
int permits, std::chrono::steady_clock::time_point now_micros) {
std::chrono::steady_clock::time_point moment_available =
reserve_earliest_available(permits, now_micros);
std::chrono::milliseconds diff_mills =
std::chrono::duration_cast<std::chrono::milliseconds>(moment_available -
now_micros);
return std::max(diff_mills, std::chrono::milliseconds(0));
}
async_simple::coro::SpinLock lock_;
};
class abstract_smooth_rate_limiter : public rate_limiter {
public:
virtual ~abstract_smooth_rate_limiter() {}
protected:
virtual void do_set_rate(double permits_per_second,
double stable_internal_micros) = 0;
virtual std::chrono::milliseconds stored_permits_to_wait_time(
double stored_permits, double permits_to_take) = 0;
virtual double cool_down_internal_micros() = 0;
void resync(std::chrono::steady_clock::time_point now_micros) {
// if next_free_ticket is in the past, resync to now
ELOG_DEBUG << "now micros: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
now_micros.time_since_epoch())
.count()
<< ", next_free_ticket_micros_: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
this->next_free_ticket_micros_.time_since_epoch())
.count();
if (now_micros > this->next_free_ticket_micros_) {
std::chrono::milliseconds diff_mills =
std::chrono::duration_cast<std::chrono::milliseconds>(
now_micros - this->next_free_ticket_micros_);
double newPermits = diff_mills.count() / cool_down_internal_micros();
this->stored_permits_ =
std::min(this->max_permits_, this->stored_permits_ + newPermits);
this->next_free_ticket_micros_ = now_micros;
}
}
void do_set_rate(double permits_per_second,
std::chrono::steady_clock::time_point now_micros) override {
resync(now_micros);
double stable_internal_micros = 1000 / permits_per_second;
this->stable_internal_micros_ = stable_internal_micros;
do_set_rate(permits_per_second, stable_internal_micros);
}
std::chrono::steady_clock::time_point reserve_earliest_available(
int required_permits,
std::chrono::steady_clock::time_point now_micros) override {
resync(now_micros);
std::chrono::steady_clock::time_point return_value =
this->next_free_ticket_micros_;
double stored_permits_to_spend =
std::min((double)required_permits, this->stored_permits_);
double fresh_permits = required_permits - stored_permits_to_spend;
std::chrono::milliseconds wait_micros =
stored_permits_to_wait_time(this->stored_permits_,
stored_permits_to_spend) +
std::chrono::milliseconds(
(int64_t)(fresh_permits * this->stable_internal_micros_));
this->next_free_ticket_micros_ += wait_micros;
this->stored_permits_ -= stored_permits_to_spend;
return return_value;
}
/**
* The currently stored permits.
*/
double stored_permits_ = 0;
/**
* The maximum number of stored permits.
*/
double max_permits_ = 0;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable
* rate of 5 permits per second has a stable internal of 200ms.
*/
double stable_internal_micros_ = 0;
/**
* The time when the next request (no matter its size) will be granted. After
* granting a request, this is pushed further in the future. Large requests
* push this further than small requests.
*/
std::chrono::steady_clock::time_point next_free_ticket_micros_;
};
class smooth_bursty_rate_limiter : public abstract_smooth_rate_limiter {
public:
smooth_bursty_rate_limiter(double permits_per_second) {
this->max_burst_seconds_ = 1.0;
async_simple::coro::syncAwait(set_rate(permits_per_second));
}
protected:
void do_set_rate(double permits_per_second, double stable_internal_micros) {
double old_max_permits = this->max_permits_;
this->max_permits_ = this->max_burst_seconds_ * permits_per_second;
this->stored_permits_ =
(0 == old_max_permits)
? 0
: this->stored_permits_ * this->max_permits_ / old_max_permits;
ELOG_DEBUG << "max_permits_: " << this->max_permits_
<< ", stored_permits_:" << this->stored_permits_;
}
std::chrono::milliseconds stored_permits_to_wait_time(
double stored_permits, double permits_to_take) {
return std::chrono::milliseconds(0);
}
double cool_down_internal_micros() { return this->stable_internal_micros_; }
private:
/**
* The work(permits) of how many seconds can be saved up if the rate_limiter
* is unused.
*/
double max_burst_seconds_ = 0;
};
} // namespace coro_io