registry/storage/driver/s3-aws/v1/s3wrapper.go (338 lines of code) (raw):

package v1 import ( "errors" "fmt" "net/http" "slices" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/cenkalti/backoff/v4" "github.com/docker/distribution/registry/storage/driver/s3-aws/common" "golang.org/x/time/rate" ) // The SDK does not define these constants in the s3 package, other packages // define some of them but not both, and the context of each package // name will make it confusing. // Full list of errors https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html const ( errCodeInternalError = "InternalError" errCodeSlowDown = "SlowDown" ) // retryableErrors contains a list of recommended errors to retry for certain S3 errors // https://repost.aws/knowledge-center/http-5xx-errors-s3 var retryableErrors = []string{ errCodeInternalError, errCodeSlowDown, } type backoffConstructor func() backoff.BackOff func withNoExponentialBackoff() backoff.BackOff { return &noBackoff{} } // noBackoff disables exponential backoffs. type noBackoff struct{} // NextBackOff always returns backoff.Stop to signal the caller not to retry the operation. func (*noBackoff) NextBackOff() time.Duration { return backoff.Stop } // Reset to initial state. func (*noBackoff) Reset() {} type WrapperOpt func(*S3wrapper) func WithRateLimit(maximum int64, burst int) WrapperOpt { return func(w *S3wrapper) { w.Limiter = rate.NewLimiter(rate.Limit(maximum), burst) } } func WithBackoffNotify(n backoff.Notify) WrapperOpt { return func(w *S3wrapper) { w.notify = n } } func WithExponentialBackoff(maximum int64) WrapperOpt { if maximum < 0 { maximum = 0 } return func(w *S3wrapper) { w.backoff = func() backoff.BackOff { b := backoff.NewExponentialBackOff() b.InitialInterval = common.DefaultInitialInterval b.RandomizationFactor = common.DefaultRandomizationFactor b.Multiplier = common.DefaultMultiplier b.MaxInterval = common.DefaultMaxInterval b.MaxElapsedTime = common.DefaultMaxElapsedTime // nolint:gosec // there is no overflow here, max is always positive return backoff.WithMaxRetries(b, uint64(maximum)) } } } var _ common.S3WrapperIf = (*S3wrapper)(nil) // S3wrapper implements a subset of s3iface.S3API allowing us to rate limit, // retry, add trace logging, or otherwise improve s3 calls made by the driver. type S3wrapper struct { s3 s3iface.S3API *rate.Limiter backoff backoffConstructor notify backoff.Notify } func NewS3Wrapper(s3API s3iface.S3API, opts ...WrapperOpt) *S3wrapper { w := &S3wrapper{ s3: s3API, Limiter: rate.NewLimiter(rate.Inf, 0), backoff: withNoExponentialBackoff, } for _, o := range opts { o(w) } return w } func (w *S3wrapper) PutObjectWithContext(ctx aws.Context, input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) { var out *s3.PutObjectOutput err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.PutObjectWithContext(ctx, input, opts...) // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("PutObjectWithContext") } return err }, ) return out, err } func (w *S3wrapper) GetObjectWithContext(ctx aws.Context, input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) { var out *s3.GetObjectOutput err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.GetObjectWithContext(ctx, input, opts...) // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("GetObjectWithContext") } return err }, ) return out, err } func (w *S3wrapper) CreateMultipartUploadWithContext(ctx aws.Context, input *s3.CreateMultipartUploadInput, opts ...request.Option) (*s3.CreateMultipartUploadOutput, error) { var out *s3.CreateMultipartUploadOutput err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.CreateMultipartUploadWithContext(ctx, input, opts...) // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("CreateMultipartUploadWithContext") } return err }, ) return out, err } func (w *S3wrapper) ListMultipartUploadsWithContext(ctx aws.Context, input *s3.ListMultipartUploadsInput, opts ...request.Option) (*s3.ListMultipartUploadsOutput, error) { var out *s3.ListMultipartUploadsOutput err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.ListMultipartUploadsWithContext(ctx, input, opts...) // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("ListMultipartUploadsWithContext") } return err }, ) return out, err } func (w *S3wrapper) HeadObjectWithContext(ctx aws.Context, input *s3.HeadObjectInput, opts ...request.Option) (*s3.HeadObjectOutput, error) { var out *s3.HeadObjectOutput err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.HeadObjectWithContext(ctx, input, opts...) // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("HeadObjectWithContext") } return err }, ) return out, err } func (w *S3wrapper) ListPartsWithContext(ctx aws.Context, input *s3.ListPartsInput, opts ...request.Option) (*s3.ListPartsOutput, error) { var out *s3.ListPartsOutput err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.ListPartsWithContext(ctx, input, opts...) // make sure we never have a situation where `IsTruncated` is empty if we have a response if out != nil { if out.IsTruncated == nil { out.IsTruncated = new(bool) } } // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("ListPartsWithContext") } return err }, ) return out, err } func (w *S3wrapper) ListObjectsV2WithContext(ctx aws.Context, input *s3.ListObjectsV2Input, opts ...request.Option) (*s3.ListObjectsV2Output, error) { var out *s3.ListObjectsV2Output err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.ListObjectsV2WithContext(ctx, input, opts...) // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("ListObjectsV2WithContext") } return err }, ) return out, err } func (w *S3wrapper) CopyObjectWithContext(ctx aws.Context, input *s3.CopyObjectInput, opts ...request.Option) (*s3.CopyObjectOutput, error) { var out *s3.CopyObjectOutput err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.CopyObjectWithContext(ctx, input, opts...) // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("CopyObjectWithContext") } return err }, ) return out, err } func (w *S3wrapper) UploadPartCopyWithContext(ctx aws.Context, input *s3.UploadPartCopyInput, opts ...request.Option) (*s3.UploadPartCopyOutput, error) { var out *s3.UploadPartCopyOutput err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.UploadPartCopyWithContext(ctx, input, opts...) // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("UploadPartCopyWithContext") } return err }, ) return out, err } func (w *S3wrapper) CompleteMultipartUploadWithContext(ctx aws.Context, input *s3.CompleteMultipartUploadInput, opts ...request.Option) (*s3.CompleteMultipartUploadOutput, error) { var out *s3.CompleteMultipartUploadOutput err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.CompleteMultipartUploadWithContext(ctx, input, opts...) // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("CompleteMultipartUploadWithContext") } return err }, ) return out, err } func (w *S3wrapper) DeleteObjectsWithContext(ctx aws.Context, input *s3.DeleteObjectsInput, opts ...request.Option) (*s3.DeleteObjectsOutput, error) { var out *s3.DeleteObjectsOutput f := func() error { var err error out, err = w.s3.DeleteObjectsWithContext(ctx, input, opts...) if err != nil { return err } // a nil response must be captured as an error (if no error is provided) if out == nil { return nilRespError("DeleteObjectsWithContext") } for _, e := range out.Errors { if e != nil { if slices.Contains(retryableErrors, *e.Code) { return errors.New(*e.Code) } } } return err } err := w.waitRetryNotify(ctx, f) if err != nil && !slices.Contains(retryableErrors, err.Error()) { return out, err } return out, nil } func (w *S3wrapper) GetObjectRequest(input *s3.GetObjectInput) (*request.Request, *s3.GetObjectOutput) { // This does not make network calls, no need to rate limit. return w.s3.GetObjectRequest(input) } func (w *S3wrapper) HeadObjectRequest(input *s3.HeadObjectInput) (*request.Request, *s3.HeadObjectOutput) { // This does not make network calls, no need to rate limit. return w.s3.HeadObjectRequest(input) } func (w *S3wrapper) ListObjectsV2PagesWithContext(ctx aws.Context, input *s3.ListObjectsV2Input, f func(*s3.ListObjectsV2Output, bool) bool, opts ...request.Option) error { return w.waitRetryNotify(ctx, func() error { return w.s3.ListObjectsV2PagesWithContext(ctx, input, f, opts...) }) } func (w *S3wrapper) AbortMultipartUploadWithContext(ctx aws.Context, input *s3.AbortMultipartUploadInput, opts ...request.Option) (*s3.AbortMultipartUploadOutput, error) { var out *s3.AbortMultipartUploadOutput err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.AbortMultipartUploadWithContext(ctx, input, opts...) // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("AbortMultipartUploadWithContext") } return err }, ) return out, err } func (w *S3wrapper) UploadPartWithContext(ctx aws.Context, input *s3.UploadPartInput, opts ...request.Option) (*s3.UploadPartOutput, error) { var out *s3.UploadPartOutput err := w.waitRetryNotify( ctx, func() error { var err error out, err = w.s3.UploadPartWithContext(ctx, input, opts...) // a nil response must be captured as an error (if no error is provided) if out == nil && err == nil { err = nilRespError("UploadPartWithContext") } return err }, ) return out, err } func (w *S3wrapper) waitRetryNotify(ctx aws.Context, f backoff.Operation) error { err := backoff.RetryNotify( func() error { if err := w.Wait(ctx); err != nil { return backoff.Permanent(err) } awsErr := f() return wrapAWSerr(awsErr) }, w.backoff(), w.notify, ) return err } // wrapAWSerr wraps the original error with backoff.Permanent if the error // should not be retried. func wrapAWSerr(e error) error { if e == nil { return nil } // Retry any request failures that are server errors. var reqErr awserr.RequestFailure if errors.As(e, &reqErr) { if reqErr.StatusCode() != http.StatusTooManyRequests && reqErr.StatusCode() < http.StatusInternalServerError && reqErr.Code() != request.ErrCodeSerialization { return backoff.Permanent(e) } return e } // Some retryable errors are not specifically awserr.RequestFailure, continue // evaluating errors to see if we can retry. // Don't attempt to backoff from errors that are known to be client errors. var awsErr awserr.Error if errors.As(e, &awsErr) { if awsErr.Code() == request.ErrCodeInvalidPresignExpire { return backoff.Permanent(e) } } return e } func nilRespError(s3API string) error { return fmt.Errorf("received a nil response for %q from s3", s3API) }