go/mqtt/retry/exponential_backoff.go (83 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package retry
import (
"context"
"log/slog"
"math"
"math/rand"
"time"
"github.com/Azure/iot-operations-sdks/go/internal/log"
"github.com/Azure/iot-operations-sdks/go/internal/wallclock"
)
// ExponentialBackoff implements a retry policy with exponential backoff and
// optional jitter.
type ExponentialBackoff struct {
// MaxAttempts sets the maximum number of attempts. The default value of 0
// indicates unlimited attempts; setting this to 1 will disable retries.
MaxAttempts uint64
// MinInterval is the maximum interval between retries (before jitter).
// Will be set to a default of 1/8s if unspecified.
MinInterval time.Duration
// MaxInterval is the maximum interval between retries (before jitter).
// Will be set to a default of one minute if unspecified.
MaxInterval time.Duration
// Timeout is the total timeout for all retries.
Timeout time.Duration
// NoJitter removes the default jitter.
NoJitter bool
// Logger provides a logger which will be used to log retry attempts and
// results.
Logger *slog.Logger
}
// Start initiates the retry executions.
func (e *ExponentialBackoff) Start(
ctx context.Context,
name string,
task Task,
) error {
// Create a context with timeout if specified.
if e.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, e.Timeout)
defer cancel()
}
l := logger{log.Wrap(e.Logger)}
for attempt := uint64(1); ; attempt++ {
l.attempt(ctx, name, attempt)
retry, err := task(ctx)
if err == nil {
l.complete(ctx, name, attempt, nil)
return nil
}
interval := e.shouldRetry(ctx, attempt, retry)
if interval == 0 {
l.complete(ctx, name, attempt, err)
return err
}
l.retry(ctx, name, attempt, err, interval)
select {
case <-wallclock.Instance.After(interval):
case <-ctx.Done():
err := context.Cause(ctx)
l.complete(ctx, name, attempt, err)
return err
}
}
}
// Decide if we need to continue/start retrying the target operations based on
// the retry count and other conditions.
func (e *ExponentialBackoff) shouldRetry(
ctx context.Context,
attempt uint64,
retry bool,
) time.Duration {
switch {
case !retry,
attempt == e.MaxAttempts,
ctx.Err() != nil:
return 0
}
minInterval := e.MinInterval
if minInterval == 0 {
minInterval = time.Second / 8
}
maxInterval := e.MaxInterval
if maxInterval == 0 {
maxInterval = time.Minute
}
// Calculate exponent and clamp to max exponent.
factor := math.Pow(2, min(
float64(attempt-1),
math.Log2(float64(maxInterval)/float64(minInterval)),
))
if !e.NoJitter {
factor = e.jitter(factor)
}
return time.Duration(factor * float64(minInterval))
}
// Add random jitter to the base time to avoid synchronicity in retry attempts.
// The jitter is between 95% and 105% of the base time.
func (*ExponentialBackoff) jitter(base float64) float64 {
// #nosec G404
j := rand.New(rand.NewSource(wallclock.Instance.Now().UnixNano())).Float64()
return base * (.95 + .1*j)
}