exponential/exponential.go (214 lines of code) (raw):
package exponential
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"testing"
"time"
)
// timer is a type that wraps a channel that will receive a time.Time when the timer is done.
// This is used to allow internal testing of the package.
type timer struct {
// C is the channel that will receive a time.Time when the timer is done.
// When faking, "c" contains the channel without the <- so that we can feed it.
C <-chan time.Time
// c only exists if this is faking and is the same channel as C except without the <-.
c chan time.Time
// when is the time the timer is set to go off.
when time.Time
// timer is used when not faking and is the real time.Timer.
timer *time.Timer
// mu protects everything below.
mu sync.Mutex
// stopped is true if Stop() has been called. Only valid if faking.
stopped bool
}
// Stop implements time.Timer.Stop().
func (t *timer) Stop() bool {
if t.timer == nil {
t.mu.Lock()
defer t.mu.Unlock()
t.stopped = true
return true
}
return t.timer.Stop()
}
// clock provides access to the various time functions we need.
// This allows internal testing of the package.
type clock interface {
Now() time.Time
NewTimer(d time.Duration) *timer
Until(t time.Time) time.Duration
}
// Backoff provides a mechanism for retrying operations with exponential backoff. This can be used in
// tests without a fake/mock interface to simulate retries either by using the WithTesting()
// option or by setting a Policy that works with your test. This keeps code leaner, avoids
// dynamic dispatch, unneeded allocations and is easier to test.
type Backoff struct {
// policy is the backoff policy to use.
policy Policy
// useTest is true if we are using the test options. Set with WithTesting().
useTest bool
// transformers is a list of error transformers to apply to the error before determining
// if we should retry.
transformers []ErrTransformer
// clock is used to allow internal testing of the package.
// If not set, uses the time package.
clock clock
}
// Options are used to configure the backoff policy.
type Option func(*Backoff) error
// WithPolicy sets the backoff policy to use. If not specified, then DefaultPolicy is used.
func WithPolicy(policy Policy) Option {
return func(b *Backoff) error {
b.policy = policy
return nil
}
}
// testOptions is a placeholder for future test options.
type testOptions struct{}
// TestOption is an option for WithTesting(). Functions that implement TestOption
// provide options for tests. This is a placeholder for future test options
// and is not used at this time.
type TestOption func(t *testOptions) error
// WithTesting invokes the backoff policy with no actual delay.
// Cannot be used outside of a test or this will panic.
func WithTesting(options ...TestOption) Option {
if !testing.Testing() {
panic("called WithTesting outside of a test")
}
return func(b *Backoff) error {
b.useTest = true
return nil
}
}
// ErrTransformer is a function that can be used to transform an error before it is returned.
// The typical case is to make an error a permanent error based on some criteria in order to
// stop retries. The other use is to use errors.ErrRetryAfter as a wrapper to specify the minimum
// time the retry must wait based on a response from a service. This type allows packaging of custom
// retry logic in one place for reuse instead of in the Op. As ErrTransformrers are applied in order,
// the last one to change an error will be the error returned.
type ErrTransformer func(err error) error
// WithErrTransformer sets the error transformers to use. If not specified, then no transformers are used.
// Passing multiple transformers will apply them in order. If WithErrTransformer is passed multiple times,
// only the final transformers are used (aka don't do that).
func WithErrTransformer(transformers ...ErrTransformer) Option {
return func(b *Backoff) error {
b.transformers = transformers
return nil
}
}
// New creates a new Backoff instance with the given options.
func New(options ...Option) (*Backoff, error) {
b := &Backoff{
policy: defaults(),
}
for _, o := range options {
if err := o(b); err != nil {
return nil, err
}
}
if err := b.policy.validate(); err != nil {
return nil, err
}
return b, nil
}
// Record is the record of a Retry attempt.
type Record struct {
// Attempt is the number of attempts (initial + retries). A zero value of Record has Attempt == 0.
Attempt int
// LastInterval is the last interval used.
LastInterval time.Duration
// TotalInterval is the total amount of time spent in intervals between attempts.
TotalInterval time.Duration
// Err is the last error returned by an operation. It is important to remember that this is
// the last error returned by the prior invocation of the Op and should only be used for logging
// purposes.
Err error
}
// now returns the current time. This is used to allow internal testing of the package.
// We do this instead of using clock directly to avoid dynamic dispatch.
func (b *Backoff) now() time.Time {
if b.clock == nil {
return time.Now()
}
return b.clock.Now()
}
// until returns the time until the given time. This is used to allow internal testing of the package.
// We do this instead of using clock directly to avoid dynamic dispatch.
func (b *Backoff) until(t time.Time) time.Duration {
if b.clock == nil {
return time.Until(t)
}
return b.clock.Until(t)
}
// newTimer creates a new timer. This is used to allow internal testing of the package.
// We do this instead of using clock directly to avoid dynamic dispatch.
func (b *Backoff) newTimer(d time.Duration) *timer {
if b.clock == nil {
t := time.NewTimer(d)
return &timer{C: t.C, timer: t}
}
return b.clock.NewTimer(d)
}
// Op is a function that can be retried.
type Op func(context.Context, Record) error
// RetryOption is an option for the Retry method. Functions that implement RetryOption
// provide an override on a single call.
type RetryOption func(o *retryOptions) error
// retryOptions provides override options on a single Retry() call. Currently empty, but provided
// for future extensibility without breaking the API.
type retryOptions struct{}
// Retry will retry the given operation until it succeeds, the context is cancelled or an error
// is returned with PermanentErr(). This is safe to call concurrently.
func (b *Backoff) Retry(ctx context.Context, op Op, options ...RetryOption) error {
r := Record{Attempt: 1}
// Make our first attempt.
err := op(ctx, r)
if err == nil {
return nil
}
// Well, that didn't work, so let's start our retry work.
r.Err = err
baseInterval := b.policy.InitialInterval
realInterval := b.randomize(baseInterval)
for {
err = b.applyTransformers(err)
if errors.Is(err, ErrPermanent) {
return err
}
if b.policy.MaxAttempts > 0 && r.Attempt >= b.policy.MaxAttempts {
return fmt.Errorf("exceeded max attempts: %w: %w", r.Err, ErrPermanent)
}
// Check to see if the error contained an interval that is longer
// than the exponential retry timer. If it is, we will use the error
// retry timer.
realInterval = b.intervalSpecified(err, realInterval)
// If our context is done or our interval goes over the context deadline,
// then we are done.
if !b.ctxOK(ctx, realInterval) {
return fmt.Errorf("r.Err: %w", ErrRetryCanceled)
}
// Do this if they did not pass the WithTesting() option.
if !b.useTest {
timer := b.newTimer(realInterval)
select {
case <-ctx.Done():
timer.Stop() // Prevent goroutine leak
return fmt.Errorf("%w: %w ", r.Err, ErrRetryCanceled)
case <-timer.C:
}
}
// Record attempt last attempt number, our last interval and total interval.
r.LastInterval = realInterval
r.TotalInterval += realInterval
r.Attempt++
// NO WHAMMIES, NO WHAMMIES, STOP!
// https://www.youtube.com/watch?v=1mGrM72Z4-Y
err = op(ctx, r)
if err == nil {
return nil
}
// Captures our last error in the record.
r.Err = err
// Create our new base interval for the next attempt.
baseInterval = time.Duration(float64(baseInterval) * b.policy.Multiplier)
// Our base interval cannot exceed the maximum interval.
if baseInterval > b.policy.MaxInterval {
baseInterval = b.policy.MaxInterval
}
// Randomize the interval based on our randomization factor.
realInterval = b.randomize(baseInterval)
}
}
// applyTransformers applies the error transformers to the error. If there are no transformers, the error
// is returned as is.
func (b *Backoff) applyTransformers(err error) error {
if len(b.transformers) == 0 {
return err
}
for _, t := range b.transformers {
err = t(err)
}
return err
}
// randomize randomizes the interval based on the policy randomization factor. This can be be in the negative
// or positive direction.
func (b *Backoff) randomize(interval time.Duration) time.Duration {
if b.policy.RandomizationFactor == 0 {
return interval
}
// Calculate the random range.
delta := b.policy.RandomizationFactor * float64(interval)
min := interval - time.Duration(delta)
max := interval + time.Duration(delta)
if max-min <= 0 {
return time.Duration(min)
}
// Get a random number in the range. So if RandomizationFactor is 0.5, and interval is 1s,
// then we will get a random number between 0.5s and 1.5s.
return time.Duration(rand.Int63n(int64(max-min))) + min // #nosec
}
// internalSpecified is used to check if the error message contains retry hints. If it does
// and it is more than the exponential retry timer, we will use the retry timer from the server.
// If it is less than the exponential retry timer, we will use the exponential retry timer.
// If the WithTextMatching() option is not used, we will always use the exponential retry timer.
func (b *Backoff) intervalSpecified(err error, expInterval time.Duration) time.Duration {
// We always honor a retry internal specified in the error if it is greater than the exponential retry timer.
serverInterval := b.errHasRetryInterval(err)
if serverInterval > 0 {
if serverInterval > expInterval {
return serverInterval
}
return expInterval
}
return expInterval
}
// errHasRetryInterval looks to see if the error contains errors.ErrRetryAfter. If so, the one with
// the longest time is returned as a duration from now. If there are no errors.ErrRetryAfter, then
// 0 is returned.
func (b *Backoff) errHasRetryInterval(err error) time.Duration {
var d time.Duration
for {
e := ErrRetryAfter{}
if errors.As(err, &e) {
newDur := b.until(e.Time)
if newDur > d {
d = newDur
}
err = errors.Unwrap(err)
continue
}
break
}
return d
}
// ctxOK takes in a Context and interval and returns if we should continue execution.
// This returns false if a Context deadline is shorter than our interval or the Context
// has been cancelled or timed out.
func (b *Backoff) ctxOK(ctx context.Context, interval time.Duration) bool {
if ctx.Err() != nil {
return false
}
deadline, ok := ctx.Deadline()
if !ok {
return true
}
// We have a deadline, so let's see if we have time for another attempt.
remaining := b.until(deadline)
if remaining <= 0 {
return false
}
// We have time for another attempt, but we need to see if we have time for the interval.
if remaining < interval {
return false
}
// We have time for the interval.
return true
}