Throttler.cpp (237 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <wdt/Throttler.h>
#include <wdt/ErrorCodes.h>
#include <wdt/WdtOptions.h>
namespace facebook {
namespace wdt {
// Constants for different calculations
const int64_t kMillisecsPerSec = 1000;
const double kPeakMultiplier = 1.2;
const int kBucketMultiplier = 2;
const double kTimeMultiplier = 0.25;
std::shared_ptr<Throttler> Throttler::makeThrottler(
const ThrottlerOptions& options) {
return std::make_shared<Throttler>(options);
}
void Throttler::configureOptions(double& avgRatePerSec, double& peakRatePerSec,
double& bucketLimit) {
if (peakRatePerSec < avgRatePerSec && peakRatePerSec >= 0) {
WLOG(WARNING) << "Per thread peak rate should be greater "
<< "than per thread average rate. "
<< "Making peak rate 1.2 times the average rate";
peakRatePerSec = kPeakMultiplier * (double)avgRatePerSec;
}
if (bucketLimit <= 0 && peakRatePerSec > 0) {
bucketLimit = kTimeMultiplier * kBucketMultiplier * peakRatePerSec;
WLOG(INFO) << "Burst limit not specified but peak "
<< "rate is configured. Auto configuring to " << bucketLimit;
}
}
Throttler::Throttler(const ThrottlerOptions& options)
: avgRatePerSec_(options.avg_rate_per_sec) {
bucketRatePerSec_ = options.max_rate_per_sec;
tokenBucketLimit_ = kTimeMultiplier * kBucketMultiplier * bucketRatePerSec_;
/* We keep the number of tokens generated as zero initially
* It could be argued that we keep this filled when we created the
* bucket. However the startTime is passed in this case and the hope is
* that we will have enough number of tokens by the time we send the data
*/
tokenBucket_ = 0;
if (options.throttler_bucket_limit > 0) {
tokenBucketLimit_ = options.throttler_bucket_limit;
}
if (avgRatePerSec_ > 0) {
WLOG(INFO) << "Average rate " << avgRatePerSec_;
} else {
WLOG(INFO) << "No average rate specified";
}
if (bucketRatePerSec_ > 0) {
WLOG(INFO) << "Peak rate " << bucketRatePerSec_ << ". Bucket limit "
<< tokenBucketLimit_;
} else {
WLOG(INFO) << "No peak rate specified";
}
WDT_CHECK_GT(options.single_request_limit, 0);
singleRequestLimit_ = options.single_request_limit;
throttlerLogTimeMillis_ = options.throttler_log_time_millis;
}
void Throttler::setThrottlerRates(double& avgRatePerSec,
double& bucketRatePerSec,
double& tokenBucketLimit) {
// configureOptions will change the rates in case they don't make
// sense
configureOptions(avgRatePerSec, bucketRatePerSec, tokenBucketLimit);
std::unique_lock lock(throttlerMutex_);
resetState();
WLOG(INFO) << "Updating the rates avgRatePerSec : " << avgRatePerSec
<< " bucketRatePerSec : " << bucketRatePerSec
<< " tokenBucketLimit : " << tokenBucketLimit;
avgRatePerSec_ = avgRatePerSec;
bucketRatePerSec_ = bucketRatePerSec;
tokenBucketLimit_ = tokenBucketLimit;
}
void Throttler::setThrottlerRates(const ThrottlerOptions& options) {
double avgRatePerSec = options.avg_rate_per_sec;
double peakRatePerSec = options.max_rate_per_sec;
double bucketLimit = options.throttler_bucket_limit;
setThrottlerRates(avgRatePerSec, peakRatePerSec, bucketLimit);
}
void Throttler::limit(ThreadCtx& threadCtx, int64_t deltaProgress) {
limitInternal(&threadCtx, deltaProgress);
}
void Throttler::limit(int64_t deltaProgress) {
limitInternal(nullptr, deltaProgress);
}
void Throttler::limitInternal(ThreadCtx* threadCtx, int64_t deltaProgress) {
const int kLogInterval = 100;
int64_t numThrottled = 0;
int64_t count = 0;
while (numThrottled < deltaProgress) {
const int64_t toThrottle =
std::min(singleRequestLimit_, deltaProgress - numThrottled);
limitSingleRequest(threadCtx, toThrottle);
numThrottled += toThrottle;
count++;
if (count % kLogInterval == 0) {
WLOG(INFO) << "Throttling large amount data, to-throttle: "
<< deltaProgress << ", num-throttled: " << numThrottled;
}
}
}
void Throttler::limitSingleRequest(ThreadCtx* threadCtx,
int64_t deltaProgress) {
WDT_CHECK_LE(deltaProgress, singleRequestLimit_);
std::chrono::time_point<Clock> now = Clock::now();
double sleepTimeSeconds = calculateSleep(deltaProgress, now);
if (throttlerLogTimeMillis_ > 0) {
printPeriodicLogs(now, deltaProgress);
}
if (sleepTimeSeconds <= 0) {
return;
}
if (threadCtx == nullptr) {
sleep(sleepTimeSeconds);
return;
}
PerfStatCollector statCollector(*threadCtx, PerfStatReport::THROTTLER_SLEEP);
sleep(sleepTimeSeconds);
}
void Throttler::sleep(double sleepTimeSecs) const {
/* sleep override */
std::this_thread::sleep_for(std::chrono::duration<double>(sleepTimeSecs));
}
double Throttler::calculateSleep(double deltaProgress,
const Clock::time_point& now) {
std::unique_lock lock(throttlerMutex_);
if (refCount_ <= 0) {
WLOG(ERROR) << "Using the throttler without registering the transfer";
return -1;
}
progress_ += deltaProgress;
double avgThrottlerSleep = averageThrottler(now);
const bool willSleep = (avgThrottlerSleep > 0);
if (willSleep) {
return avgThrottlerSleep;
}
// we still hold the lock if peak throttler can come into effect
if ((bucketRatePerSec_ > 0) && (tokenBucketLimit_ > 0)) {
std::chrono::duration<double> elapsedDuration = now - lastFillTime_;
lastFillTime_ = now;
double elapsedSeconds = elapsedDuration.count();
tokenBucket_ += elapsedSeconds * bucketRatePerSec_;
if (tokenBucket_ > tokenBucketLimit_) {
tokenBucket_ = tokenBucketLimit_;
}
tokenBucket_ -= deltaProgress;
if (tokenBucket_ < 0) {
/*
* If we have negative number of tokens lets sleep
* This way we will have positive number of tokens next time
*/
double peakThrottlerSleep = -1.0 * tokenBucket_ / bucketRatePerSec_;
WVLOG(2) << "Peak throttler wants to sleep " << peakThrottlerSleep
<< " seconds";
return peakThrottlerSleep;
}
}
return -1;
}
void Throttler::printPeriodicLogs(const Clock::time_point& now,
double deltaProgress) {
/*
* This is the part where throttler prints out the progress
* made periodically.
*/
std::chrono::duration<double> elapsedLogDuration;
std::unique_lock lock(throttlerMutex_);
instantProgress_ += deltaProgress;
elapsedLogDuration = now - lastLogTime_;
double elapsedLogSeconds = elapsedLogDuration.count();
if (elapsedLogSeconds * kMillisecsPerSec >= throttlerLogTimeMillis_) {
double instantRatePerSec = 0;
instantRatePerSec = instantProgress_ / elapsedLogSeconds;
instantProgress_ = 0;
lastLogTime_ = now;
std::chrono::duration<double> elapsedAvgDuration = now - startTime_;
double elapsedAvgSeconds = elapsedAvgDuration.count();
double avgRatePerSec = progress_ / elapsedAvgSeconds;
WLOG(INFO) << "Throttler:Transfer_Rates::"
<< " " << elapsedAvgSeconds << " " << avgRatePerSec << " "
<< instantRatePerSec << " " << deltaProgress;
}
}
double Throttler::averageThrottler(const Clock::time_point& now) {
std::chrono::duration<double> elapsedDuration = now - startTime_;
double elapsedSeconds = elapsedDuration.count();
if (avgRatePerSec_ <= 0) {
WVLOG(2) << "There is no avg rate limit";
return -1;
}
const double allowedProgress = avgRatePerSec_ * elapsedSeconds;
if (progress_ > allowedProgress) {
double idealTime = progress_ / avgRatePerSec_;
const double sleepTimeSeconds = idealTime - elapsedSeconds;
WVLOG(1) << "Throttler : Elapsed " << elapsedSeconds
<< " seconds. Made progress " << progress_ << " in "
<< elapsedSeconds
<< " seconds, maximum allowed progress for this duration is "
<< allowedProgress << ". Mean Rate allowed is " << avgRatePerSec_
<< " . Sleeping for " << sleepTimeSeconds << " seconds";
return sleepTimeSeconds;
}
return -1;
}
void Throttler::startTransfer() {
std::unique_lock lock(throttlerMutex_);
if (refCount_ == 0) {
resetState();
}
refCount_++;
}
void Throttler::resetState() {
startTime_ = Clock::now();
lastFillTime_ = startTime_;
lastLogTime_ = startTime_;
instantProgress_ = 0;
progress_ = 0;
tokenBucket_ = 0;
}
void Throttler::endTransfer() {
std::unique_lock lock(throttlerMutex_);
WDT_CHECK(refCount_ > 0);
refCount_--;
}
double Throttler::getProgress() {
std::unique_lock lock(throttlerMutex_);
return progress_;
}
double Throttler::getAvgRatePerSec() {
std::unique_lock lock(throttlerMutex_);
return avgRatePerSec_;
}
double Throttler::getPeakRatePerSec() {
std::unique_lock lock(throttlerMutex_);
return bucketRatePerSec_;
}
double Throttler::getBucketLimit() {
std::unique_lock lock(throttlerMutex_);
return tokenBucketLimit_;
}
int64_t Throttler::getThrottlerLogTimeMillis() {
std::unique_lock lock(throttlerMutex_);
return throttlerLogTimeMillis_;
}
void Throttler::setThrottlerLogTimeMillis(int64_t throttlerLogTimeMillis) {
std::unique_lock lock(throttlerMutex_);
throttlerLogTimeMillis_ = throttlerLogTimeMillis;
}
std::ostream& operator<<(std::ostream& stream, const Throttler& throttler) {
stream << "avgRate: " << throttler.avgRatePerSec_
<< ", peakRate: " << throttler.bucketRatePerSec_
<< ", bucketLimit: " << throttler.tokenBucketLimit_
<< ", throttlerLogTimeMillis: " << throttler.throttlerLogTimeMillis_;
return stream;
}
}
}