pkg/download/retry.go (79 lines of code) (raw):
package download
import (
"fmt"
"io"
"math"
"net/http"
"time"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
)
// SleepFunc pauses the execution for at least duration d.
type SleepFunc func(d time.Duration)
var (
// ActualSleep uses actual time to pause the execution.
ActualSleep SleepFunc = time.Sleep
)
const (
// time to sleep between retries is an exponential backoff formula:
// t(n) = k * m^n
expRetryN = 3 // how many times we retry the Download
expRetryK = time.Second * 3
expRetryM = 2
)
// WithRetries retrieves a response body using the specified downloader. Any
// error returned from d will be retried (and retrieved response bodies will be
// closed on failures). If the retries do not succeed, the last error is returned.
//
// It sleeps in exponentially increasing durations between retries.
func WithRetries(ctx *log.Context, downloaders []Downloader, sf SleepFunc) (io.ReadCloser, error) {
var downloadErrors error
for _, d := range downloaders {
for n := 0; n < expRetryN; n++ {
ctx := ctx.With("retry", n)
status, out, err := Download(ctx, d)
if err == nil {
return out, nil
}
if downloadErrors != nil {
downloadErrors = errors.Wrapf(downloadErrors, fmt.Sprintf("Attempt %d: %s ", n+1, err.Error()))
} else {
downloadErrors = err
}
ctx.Log("error", err)
if out != nil { // we are not going to read this response body
out.Close()
}
// If there is an access issue while downloading using this downloader, use next downloader
// For ex. User may have set up access to blob using managed identity, but not using public blob access or vice-versa.
if isAccessIssueHttpStatusCode(status) {
break
}
// status == -1 the value when there was no http request
if status != -1 && !isTransientHttpStatusCode(status) {
ctx.Log("info", fmt.Sprintf("downloader %T returned %v, skipping retries", d, status))
break
}
if n != expRetryN-1 {
// have more retries to go, sleep before retrying
slp := expRetryK * time.Duration(int(math.Pow(float64(expRetryM), float64(n))))
ctx.Log("sleep", slp)
sf(slp)
}
}
}
return nil, downloadErrors
}
func isTransientHttpStatusCode(statusCode int) bool {
switch statusCode {
case
http.StatusRequestTimeout, // 408
http.StatusTooManyRequests, // 429
http.StatusInternalServerError, // 500
http.StatusBadGateway, // 502
http.StatusServiceUnavailable, // 503
http.StatusGatewayTimeout: // 504
return true // timeout and too many requests
default:
return false
}
}
func isAccessIssueHttpStatusCode(statusCode int) bool {
switch statusCode {
case
http.StatusUnauthorized, // 401
http.StatusForbidden, // 403
http.StatusNotFound, // 404
http.StatusConflict: // 409
return true
default:
return false
}
}