lib/throttle.go (69 lines of code) (raw):

/** * Copyright (c) Facebook, Inc. and its affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ package dhcplb import ( "fmt" "sync" "github.com/golang/glog" "github.com/hashicorp/golang-lru" "golang.org/x/time/rate" ) // An LRU cache implementation of Throttle. // // We keep track of request rates per client in an LRU cache to // keep memory usage under control against malcious requests. Each // value in the cache is a rate.Limiter struct which is an implementation // of Taken Bucket algorithm. // // Adding new items to the cache is also limited to control cache // invalidation rate. // type Throttle struct { mu sync.Mutex lru *lru.Cache maxRatePerItem int cacheLimiter *rate.Limiter cacheRate int } // Returns true if the rate is below maximum for the given key func (c *Throttle) OK(key interface{}) (bool, error) { if c.maxRatePerItem <= 0 { return true, nil } c.mu.Lock() defer c.mu.Unlock() // If the limiter is not in the cache for the given key // check for the cache limiter. If it is below the maximum, // then create a limiter, add it to the cache and allocate a bucket. value, ok := c.lru.Get(key) if !ok { if c.cacheLimiter.Allow() { limiter := rate.NewLimiter(rate.Limit(c.maxRatePerItem), c.maxRatePerItem) c.lru.Add(key, limiter) return limiter.Allow(), nil } err := fmt.Errorf("Cache invalidation is too fast (max: %d item/sec) - throttling", c.cacheRate) return false, err } // So the limiter object is in the cache. Try to allocate a bucket. limiter := value.(*rate.Limiter) if !limiter.Allow() { err := fmt.Errorf("Request rate is too high for %v (max: %d req/sec) - throttling", key, c.maxRatePerItem) return false, err } return true, nil } func (c *Throttle) len() int { return c.lru.Len() } func (c *Throttle) setRate(MaxRatePerItem int) { c.mu.Lock() defer c.mu.Unlock() c.maxRatePerItem = MaxRatePerItem } // NewThrottle returns a Throttle struct // // Capacity: // Maximum capacity of the LRU cache // // CacheRate (per second): // Maximum allowed rate for adding new items to the cache. By that way it // prevents the cache invalidation to happen too soon for the existing rate // items in the cache. Cache rate will be infinite for 0 or negative values. // // MaxRatePerItem (per second): // Maximum allowed requests rate for each key in the cache. Throttling will // be disabled for 0 or negative values. No cache will be created in that case. // func NewThrottle(Capacity int, CacheRate int, MaxRatePerItem int) (*Throttle, error) { if MaxRatePerItem <= 0 { glog.Info("No throttling will be done") } cache, err := lru.New(int(Capacity)) if err != nil { return nil, err } // Keep track of the item creation rate. var cacheLimiter *rate.Limiter if CacheRate <= 0 { glog.Info("No cache rate limiting will be done") cacheLimiter = rate.NewLimiter(rate.Inf, 1) // bucket size is ignored } else { cacheLimiter = rate.NewLimiter(rate.Limit(CacheRate), CacheRate) } throttle := &Throttle{ lru: cache, maxRatePerItem: MaxRatePerItem, cacheLimiter: cacheLimiter, cacheRate: CacheRate, } return throttle, nil }