internal/offloading/sink.go (218 lines of code) (raw):

package offloading import ( "context" "errors" "fmt" "io" "os" "path/filepath" "time" "gitlab.com/gitlab-org/gitaly/v16/internal/backoff" "gocloud.dev/blob" "golang.org/x/sync/errgroup" _ "gocloud.dev/blob/azureblob" // register Azure driver _ "gocloud.dev/blob/fileblob" // register file driver _ "gocloud.dev/blob/gcsblob" // register Google Cloud driver _ "gocloud.dev/blob/memblob" // register in-memory driver _ "gocloud.dev/blob/s3blob" ) var ( // deletionGoroutineLimit is the upper bound parallel number of goroutines to delete objects deletionGoroutineLimit = 5 errMissingBucket = errors.New("missing bucket") ) // Bucket is an interface to abstract the behavior of the gocloud.dev/blob Bucket type. // This abstraction is especially useful when adding a customized bucket to intercept traffic // or to modify the functionality for specific use cases. type Bucket interface { Download(ctx context.Context, key string, w io.Writer, opts *blob.ReaderOptions) error Upload(ctx context.Context, key string, r io.Reader, opts *blob.WriterOptions) error List(opts *blob.ListOptions) *blob.ListIterator Delete(ctx context.Context, key string) (err error) Attributes(ctx context.Context, key string) (*blob.Attributes, error) Close() error } // Iterator is an interface to abstract the behavior of the gocloud.dev/blob ListIterator type. // This abstraction is especially useful when adding a customized bucket to intercept traffic // or to modify the functionality for specific use cases. type Iterator interface { Next(ctx context.Context) (*blob.ListObject, error) } // Sink is a wrapper around the storage bucket, providing an interface for // operations on offloaded objects. type Sink struct { overallTimeout time.Duration bucket Bucket backoffStrategy backoff.Strategy maxRetry uint noRetry bool retryTimeout time.Duration } // NewSink creates a Sink from the given options. If some options are not specified, // the function will use the default values for them. func NewSink(bucket Bucket, options ...SinkOption) (*Sink, error) { if bucket == nil { return nil, errMissingBucket } var cfg sinkCfg for _, apply := range options { apply(&cfg) } sink := &Sink{ overallTimeout: cfg.overallTimeout, bucket: bucket, backoffStrategy: cfg.backoffStrategy, maxRetry: cfg.maxRetry, noRetry: cfg.noRetry, retryTimeout: cfg.retryTimeout, } // fills in default values for missing options. sink.setDefaults() return sink, nil } // Upload uploads a file located at fullFilePath to the bucket under the specified prefix. // The fullFilePath include the file name, e.g. /tmp/foo.txt. func (r *Sink) Upload(ctx context.Context, fullFilePath string, prefix string) (returnErr error) { ctx, cancel := context.WithTimeout(ctx, r.overallTimeout) defer cancel() file, err := os.Open(fullFilePath) if err != nil { return fmt.Errorf("open file: %w", err) } defer func() { if err := file.Close(); err != nil { returnErr = errors.Join(returnErr, fmt.Errorf("close file: %w", err)) } }() objectKey := fmt.Sprintf("%s/%s", prefix, filepath.Base(fullFilePath)) if err := r.withRetry(ctx, func(operationCtx context.Context) error { return r.bucket.Upload(operationCtx, objectKey, file, &blob.WriterOptions{ // 'no-store' - we don't want the offloaded blobs to be cached as the content could be changed, // so we always want a fresh and up-to-date data // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#cacheability // 'no-transform' - disallows intermediates to modify data // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#other CacheControl: "no-store, no-transform", ContentType: "application/octet-stream", }) }); err != nil { return fmt.Errorf("upload object %q: %w", objectKey, err) } return nil } // Download retrieves a file from the bucket and saves it to the specified location on the local file system. // The objectKey is the key of the object in the bucket, which includes the prefix and // object name (e.g., "prefix/my_object.idx"); fullFilePath is full path on the local file system where the // object will be saved including the file name (e.g., "/tmp/foo.txt"). func (r *Sink) Download(ctx context.Context, objectKey string, fullFilePath string) (returnErr error) { ctx, cancel := context.WithTimeout(ctx, r.overallTimeout) defer cancel() file, err := os.Create(fullFilePath) if err != nil { return fmt.Errorf("create file: %w", err) } defer func() { err := file.Close() if returnErr == nil { // Downloading is successful, check of we can close the file. if err != nil { returnErr = errors.Join(returnErr, fmt.Errorf("close file: %w", err)) } } else { // Downloading has error, delete the file anyway. if err := os.Remove(fullFilePath); err != nil { returnErr = errors.Join(returnErr, fmt.Errorf("remove file when downloading failed: %w", err)) } } }() if err := r.withRetry(ctx, func(operationCtx context.Context) error { return r.bucket.Download(operationCtx, objectKey, file, nil) }); err != nil { return fmt.Errorf("download object: %w", err) } return nil } // List lists all objects in the bucket that have the specified prefix. func (r *Sink) List(ctx context.Context, prefix string) (res []string, err error) { return listHandler(ctx, r, prefix, r.bucket.List) } type listFunc[T Iterator] func(opts *blob.ListOptions) T // listHandler is responsible for loading the listFunc and executing it to perform the list operation. // // Generics are used here because the List signature in the bucket interface uses ListIterator, // which is a concrete type rather than an interface. If we need to intercept or modify ListIterator 's behavior, // such as adding delays or intentionally returning errors, generics provide the necessary flexibility to achieve this. func listHandler[T Iterator](ctx context.Context, r *Sink, prefix string, listFunc listFunc[T]) (res []string, err error) { prefix = filepath.Clean(prefix) // listExecutor is where we call the listFunc perform the list operation. // We can put listExecutor in later retry loop. listExecutor := func(operationCtx context.Context) (res []string, err error) { var listErr error var attrs *blob.ListObject it := listFunc(&blob.ListOptions{ Prefix: prefix + "/", Delimiter: "/", }) objects := make([]string, 0) for { attrs, listErr = it.Next(operationCtx) if listErr != nil { if errors.Is(listErr, io.EOF) { return objects, nil } return []string{}, fmt.Errorf("list object: %w", listErr) } // exclude the bucketPrefix "folder" itself if attrs != nil && attrs.Key != prefix+"/" { objects = append(objects, filepath.Base(attrs.Key)) } } } ctx, cancel := context.WithTimeout(ctx, r.overallTimeout) defer cancel() if err := r.withRetry(ctx, func(operationCtx context.Context) error { res, err = listExecutor(operationCtx) return err }); err != nil { return []string{}, fmt.Errorf("list object %w", err) } return res, nil } // DeleteObjects attempts to delete the specified objects within the given prefix. // The result is a map of objectKey to error. Successfully deleted objects will not appear in the map. func (r *Sink) DeleteObjects(ctx context.Context, prefix string, objectNames []string) map[string]error { res := make(map[string]error) if len(objectNames) == 0 { return res } ctx, cancel := context.WithTimeout(ctx, r.overallTimeout) defer cancel() type deleteResult struct { object string err error } resCh := make(chan deleteResult) group := errgroup.Group{} group.SetLimit(deletionGoroutineLimit) for _, object := range objectNames { group.Go(func() error { // var err error obj := fmt.Sprintf("%s/%s", prefix, filepath.Base(object)) if err := r.withRetry(ctx, func(operationCtx context.Context) error { return r.bucket.Delete(operationCtx, obj) }); err != nil { resCh <- deleteResult{object: obj, err: err} } return nil }) } go func() { // ignore error here since we use resCh to deal with error returned from the operation // no error is returned from the function called by group.Go() _ = group.Wait() close(resCh) }() for delRes := range resCh { res[delRes.object] = delRes.err } return res } // setDefaults fills in default values for missing options. func (r *Sink) setDefaults() { // Retry is wanted but it is not configured. if !r.noRetry && r.maxRetry == 0 { r.maxRetry = defaultMaxRetry } if r.overallTimeout == 0 { r.overallTimeout = defaultOverallTimeout } if r.backoffStrategy == nil { r.backoffStrategy = defaultBackoffStrategy } if r.retryTimeout == 0 { r.retryTimeout = defaultRetryTimeout } } // withRetry retries the given operation until it succeeds or the maximum number of retries is reached. func (r *Sink) withRetry(ctx context.Context, op func(context.Context) error) error { var err error for retry := uint(0); retry <= r.maxRetry; { err = func() error { operationCtx, operationCancel := context.WithTimeout(ctx, r.retryTimeout) defer operationCancel() return op(operationCtx) }() if err == nil || r.noRetry { break } timer := time.NewTimer(r.backoffStrategy.Backoff(retry)) select { case <-ctx.Done(): timer.Stop() err = fmt.Errorf("backoffStrategy operation %w", err) return err case <-timer.C: // Refresh timer expires, issue another try. retry++ } } return err }