Throttler.h (65 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/SpinLock.h>
#include <glog/logging.h>
#include <wdt/util/CommonImpl.h>
#include <thread>
namespace facebook {
namespace wdt {
struct ThrottlerOptions {
/**
* Rate at which we would like to throttle,
* specifying this as < 0 makes it unlimited
*/
double avg_rate_per_sec{-1};
/**
* Rate at which tokens will be generated in TB algorithm.
* When we lag behind, this will allow us to go faster,
* specify as < 0 to make it unlimited. Specify as 0
* for auto configuring.
* auto conf as (kPeakMultiplier * avgRatePerSec_)
* Find details in Throttler.cpp
*/
double max_rate_per_sec{0};
/**
* Maximum bucket size of TB algorithm.
* This together with maxRatePerSec_ will
* make for maximum burst rate. If specified as 0 it is
* auto configured in Throttler.cpp
*/
double throttler_bucket_limit{0};
/**
* Throttler logs statistics like average and max rate.
* This option specifies on how frequently should those
* be logged
*/
int64_t throttler_log_time_millis{0};
/**
* Limit for single request. If the requested size is greater than this, the
* request is broken in chunks of this size and processed sequentially.
* This ensures that a thread asking for large amount of resource does not
* starve other threads asking for small amount of resource.
*/
int64_t single_request_limit{1};
};
/**
* Attempts to limit the rate in two ways.
* 1. Limit average rate by calling averageThrottler()
* 2. Limit the peak rate by calling limitByTokenBucket
* Generally average throttler would be maintaining the rate to avgRate_
* although at times the actual rate might fall behind and in those
* circumstances the rate at which you will catch up is limited
* with respect to the peak rate and the bucket limit using the
* token bucket algorithm.
* Token Bucket algorithm can be found on
* http://en.wikipedia.org/wiki/Token_bucket
*/
class Throttler {
public:
/// Utility method that makes throttler using options.
static std::shared_ptr<Throttler> makeThrottler(
const ThrottlerOptions& options);
explicit Throttler(const ThrottlerOptions& options);
/**
* Calls calculateSleep which is a thread safe method. Finds out the
* time thread has to sleep and makes it sleep.
* Also calls the throttler logger to log the stats
*/
virtual void limit(ThreadCtx& threadCtx, int64_t deltaProgress);
/**
* Same as the other limit, but without reporting for sleep duration
*/
virtual void limit(int64_t deltaProgress);
/**
* This is thread safe implementation of token bucket
* algorithm. Bucket is filled at the rate of bucketRatePerSec_
* till the limit of tokenBucketLimit_
* There is no sleep, we just calculate how much to sleep.
* This method also calls the averageThrottler inside
* @param deltaProgress Progress since the last limit call
*/
virtual double calculateSleep(double totalProgress,
const Clock::time_point& now);
/// Provides anyone using this throttler instance to update the throttler
/// rates. The rates might be configured to different values than what
/// were passed.
virtual void setThrottlerRates(double& avgRatePerSec,
double& bucketRatePerSec,
double& tokenBucketLimit);
/// Utility method that set throttler rate using options
void setThrottlerRates(const ThrottlerOptions& options);
virtual ~Throttler() {
}
/// Anyone who is using the throttler should call this method to maintain
/// the refCount_ and startTime_ correctly
void startTransfer();
/// Method to de-register the transfer and decrement the refCount_
void endTransfer();
/// Get the average rate per sec
double getAvgRatePerSec();
/// Get the bucket rate per sec
double getPeakRatePerSec();
/// Get the bucket limit
double getBucketLimit();
/// Get the throttler logging time period in millis
int64_t getThrottlerLogTimeMillis();
/// Set the throttler logging time in millis
void setThrottlerLogTimeMillis(int64_t throttlerLogTimeMillis);
/// Get tokens processed till now
double getProgress();
friend std::ostream& operator<<(std::ostream& stream,
const Throttler& throttler);
private:
/**
* Sometimes the options passed to throttler might not make sense so this
* method tries to auto configure them
*/
static void configureOptions(double& avgRatePerSec, double& peakRatePerSec,
double& bucketLimit);
/**
* This method is invoked repeatedly with the amount of progress made
* (e.g. number of tokens processed) till now. If the total progress
* till now is over the allowed average progress then it returns the
* time to sleep for the calling thread
* @param now Pass in the current time stamp
*/
double averageThrottler(const Clock::time_point& now);
void limitInternal(ThreadCtx* threadCtx, int64_t deltaProgress);
void limitSingleRequest(ThreadCtx* threadCtx, int64_t deltaProgress);
void sleep(double sleepTimeSecs) const;
void resetState();
/**
* This method periodically prints logs.
* The period is defined by FLAGS_peak_log_time_ms
* @param deltaProgress Progress since last call to limit()
* @param now The time point caller has
* @param sleepTimeSeconds Duration of sleep caused by limit()
*/
void printPeriodicLogs(const Clock::time_point& now, double deltaProgress);
/// Records the time the throttler was started
Clock::time_point startTime_;
/**
* Throttler logs the average and instantaneous progress
* periodically (check FLAGS_peak_log_time_ms). lastLogTime_ is
* the last time this log was written
*/
Clock::time_point lastLogTime_;
/// Instant progress in the time stats were logged last time
double instantProgress_{0};
// Records the total progress in tokens till now
double progress_{0};
/// Last time the token bucket was filled
std::chrono::time_point<std::chrono::high_resolution_clock> lastFillTime_;
protected:
/// Number of tokens in the bucket
int64_t tokenBucket_;
/// Controls the access across threads
folly::SpinLock throttlerMutex_;
/// Number of users of this throttler
int64_t refCount_{0};
/// The average rate expected
double avgRatePerSec_;
/// Limit on the max number of tokens
double tokenBucketLimit_;
/// Rate at which bucket is filled
double bucketRatePerSec_;
/// Max number of tokens that can be requested in a single call
int64_t singleRequestLimit_;
/// Interval between every print of throttler logs
int64_t throttlerLogTimeMillis_;
};
}
} // facebook::wdt