swim/join_delayer.go (102 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package swim
import (
"math"
"math/rand"
"time"
"github.com/uber-common/bark"
"github.com/uber/ringpop-go/logging"
"github.com/uber/ringpop-go/util"
)
const (
defaultInitial = 100 * time.Millisecond
defaultMax = 60 * time.Second
)
var delayerRand = rand.New(rand.NewSource(time.Now().UnixNano()))
var defaultRandomizer = delayerRand.Intn
var defaultSleeper = time.Sleep
var noDelay = time.Duration(0)
// joinDelayer defines the API of a delayer implementation that
// applies a delay in between repeated join attempts.
type joinDelayer interface {
delay() time.Duration
}
// delayRandomizer is a function that returns a random number between
// 0 and an upper-bound, provided in its first argument.
type delayRandomizer func(int) int
// delaySleeper is a function that pauses execution for time.Duration.
type delaySleeper func(time.Duration)
// delayOpts is a struct that houses substitutable parameters of a joinDelayer
// in order to alter its behavior.
type delayOpts struct {
initial time.Duration
max time.Duration
randomizer delayRandomizer
sleeper delaySleeper
}
// newDelayOpts creates a delayOpts struct with default values.
func newDelayOpts() *delayOpts {
return &delayOpts{
initial: defaultInitial,
max: defaultMax,
randomizer: defaultRandomizer,
sleeper: defaultSleeper,
}
}
// exponentialDelayer applies a delay in between repeated join attempts.
// The delay increases exponentially and is capped at maxDelay.
type exponentialDelayer struct {
// logger logs when a maximum delay is reached.
logger bark.Logger
// initialDelay is the fixed portion of the delay by which
// the exponential backoff is multiplied.
initialDelay time.Duration
// maxDelay is the maximum delay applied to a join attempt.
maxDelay time.Duration
// maxDelayReached is a flag that is toggled once the delay
// applied to a join attempt reaches or exceeds the max delay.
maxDelayReached bool
// nextDelayMin tracks the last used upper-bound for a join attempt delay.
// Upon the next join attempt, nextDelayMin will be used as the lower-bound
// for that delay. This acts as a shifting window for the bounds of the
// exponential backoff.
nextDelayMin float64
// randomizer generates a random delay in between a nextDelayMin
// and maxDelay.
randomizer delayRandomizer
// sleeper pauses execution of a join attempt.
sleeper delaySleeper
// numDelays tracks the number of times delay has been called on the
// delayer. It's also used as the backoff exponent.
numDelays uint
}
// newExponentialDelayer creates a new exponential delayer. joiner is required.
// opts is optional.
func newExponentialDelayer(joiner string, opts *delayOpts) (*exponentialDelayer, error) {
if opts == nil {
opts = newDelayOpts()
}
randomizer := opts.randomizer
if randomizer == nil {
randomizer = defaultRandomizer
}
sleeper := opts.sleeper
if sleeper == nil {
sleeper = defaultSleeper
}
return &exponentialDelayer{
logger: logging.Logger("join").WithField("local", joiner),
initialDelay: opts.initial,
nextDelayMin: 0,
maxDelayReached: false,
maxDelay: opts.max,
randomizer: randomizer,
sleeper: sleeper,
numDelays: 0,
}, nil
}
// delay delays a join attempt by sleeping for an amount of time. The
// amount of time is computed as an exponential backoff based on the number
// of join attempts that have been made at the time of the function call;
// the number of attempts is 0-based. It returns a time.Duration equal to the
// amount of delay applied.
func (d *exponentialDelayer) delay() time.Duration {
// Convert durations to time in millis
initialDelayMs := float64(util.MS(d.initialDelay))
maxDelayMs := float64(util.MS(d.maxDelay))
// Compute uncapped exponential delay (exponent is the number of join
// attempts so far). Then, make sure the computed delay is capped at its
// max. Apply a random jitter to the actual sleep duration and finally,
// sleep.
uncappedDelay := initialDelayMs * math.Pow(2, float64(d.numDelays))
cappedDelay := math.Min(maxDelayMs, uncappedDelay)
// If cappedDelay and nextDelayMin are equal, we have reached the point
// at which the exponential backoff has reached its max; apply no more
// jitter.
var jitteredDelay int
if cappedDelay == d.nextDelayMin {
jitteredDelay = int(cappedDelay)
} else {
jitteredDelay = d.randomizer(int(cappedDelay-d.nextDelayMin)) + int(d.nextDelayMin)
}
// If this is the first time an uncapped delay reached or exceeded the
// maximum allowable delay, log a message.
if uncappedDelay >= maxDelayMs && d.maxDelayReached == false {
d.logger.WithFields(bark.Fields{
"numDelays": d.numDelays,
"initialDelay": d.initialDelay,
"minDelay": d.nextDelayMin,
"maxDelay": d.maxDelay,
"uncappedDelay": uncappedDelay,
"cappedDelay": cappedDelay,
"jitteredDelay": jitteredDelay,
}).Warn("ringpop join attempt delay reached max")
d.maxDelayReached = true
}
// Set lower-bound for next attempt to maximum of current attempt.
d.nextDelayMin = cappedDelay
sleepDuration := time.Duration(jitteredDelay) * time.Millisecond
d.sleeper(sleepDuration)
// Increment the exponent used for backoff calculation.
d.numDelays++
return sleepDuration
}
// nullDelayer is an empty implementation of joinDelayer.
type nullDelayer struct{}
// delay applies no delay.
func (d *nullDelayer) delay() time.Duration {
return time.Duration(0)
}