utils/bandwidth/limiter.go (105 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bandwidth
import (
"errors"
"fmt"
"time"
"github.com/uber/kraken/utils/log"
"github.com/uber/kraken/utils/memsize"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
// Config defines Limiter configuration.
type Config struct {
EgressBitsPerSec uint64 `yaml:"egress_bits_per_sec"`
IngressBitsPerSec uint64 `yaml:"ingress_bits_per_sec"`
// TokenSize defines the granularity of a token in the bucket. It is used to
// avoid integer overflow errors that would occur if we mapped each bit to a
// token.
TokenSize uint64 `yaml:"token_size"`
Enable bool `yaml:"enable"`
}
func (c Config) applyDefaults() Config {
if c.TokenSize == 0 {
c.TokenSize = 8 * memsize.Mbit
}
return c
}
// Limiter limits egress and ingress bandwidth via token-bucket rate limiter.
type Limiter struct {
config Config
egress *rate.Limiter
ingress *rate.Limiter
logger *zap.SugaredLogger
}
// Option allows setting optional parameters in Limiter.
type Option func(*Limiter)
// WithLogger configures a Limiter with a custom logger.
func WithLogger(logger *zap.SugaredLogger) Option {
return func(l *Limiter) { l.logger = logger }
}
// NewLimiter creates a new Limiter.
func NewLimiter(config Config, opts ...Option) (*Limiter, error) {
config = config.applyDefaults()
l := &Limiter{
config: config,
logger: log.Default(),
}
for _, opt := range opts {
opt(l)
}
if !config.Enable {
l.logger.Warn("Bandwidth limits disabled")
return l, nil
}
if config.EgressBitsPerSec == 0 {
return nil, errors.New("invalid config: egress_bits_per_sec must be non-zero")
}
if config.IngressBitsPerSec == 0 {
return nil, errors.New("invalid config: ingress_bits_per_sec must be non-zero")
}
l.logger.Infof("Setting egress bandwidth to %s/sec", memsize.BitFormat(config.EgressBitsPerSec))
l.logger.Infof("Setting ingress bandwidth to %s/sec", memsize.BitFormat(config.IngressBitsPerSec))
etps := config.EgressBitsPerSec / config.TokenSize
itps := config.IngressBitsPerSec / config.TokenSize
l.egress = rate.NewLimiter(rate.Limit(etps), int(etps))
l.ingress = rate.NewLimiter(rate.Limit(itps), int(itps))
return l, nil
}
func (l *Limiter) reserve(rl *rate.Limiter, nbytes int64) error {
if !l.config.Enable {
return nil
}
tokens := int(uint64(nbytes*8) / l.config.TokenSize)
if tokens == 0 {
tokens = 1
}
r := rl.ReserveN(time.Now(), tokens)
if !r.OK() {
return fmt.Errorf(
"cannot reserve %s of bandwidth, max is %s",
memsize.Format(uint64(nbytes)),
memsize.BitFormat(l.config.TokenSize*uint64(rl.Burst())))
}
time.Sleep(r.Delay())
return nil
}
// ReserveEgress blocks until egress bandwidth for nbytes is available.
// Returns error if nbytes is larger than the maximum egress bandwidth.
func (l *Limiter) ReserveEgress(nbytes int64) error {
return l.reserve(l.egress, nbytes)
}
// ReserveIngress blocks until ingress bandwidth for nbytes is available.
// Returns error if nbytes is larger than the maximum ingress bandwidth.
func (l *Limiter) ReserveIngress(nbytes int64) error {
return l.reserve(l.ingress, nbytes)
}
// Adjust divides the originally configured egress and ingress bps by denominator.
// Note, because the original configuration is always used, multiple Adjust calls
// have no affect on each other.
func (l *Limiter) Adjust(denominator int) error {
if denominator <= 0 {
return errors.New("denominator must be greater than 0")
}
ebps := max(l.config.EgressBitsPerSec/l.config.TokenSize/uint64(denominator), 1)
ibps := max(l.config.IngressBitsPerSec/l.config.TokenSize/uint64(denominator), 1)
l.egress.SetLimit(rate.Limit(ebps))
l.ingress.SetLimit(rate.Limit(ibps))
return nil
}
// EgressLimit returns the current egress limit.
func (l *Limiter) EgressLimit() int64 {
return int64(l.egress.Limit())
}
// IngressLimit returns the current ingress limit.
func (l *Limiter) IngressLimit() int64 {
return int64(l.ingress.Limit())
}
func max(a, b uint64) uint64 {
if a > b {
return a
}
return b
}