registry/storage/driver/gcs/gcs.go (718 lines of code) (raw):

// Package gcs provides a storagedriver.StorageDriver implementation to // store blobs in Google cloud storage. // // This package leverages the google.golang.org/cloud/storage client library // for interfacing with gcs. // // Because gcs is a key, value store the Stat call does not support last modification // time for directories (directories are an abstraction for key, value stores) // // Note that the contents of incomplete uploads are not accessible even though // Stat returns their length package gcs import ( "bytes" "context" // nolint: revive,gosec // imports-blocklist "crypto/md5" "errors" "fmt" "io" "net/http" "net/url" "runtime/debug" "sort" "strconv" "strings" "sync" "time" "cloud.google.com/go/storage" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/base" "github.com/hashicorp/go-multierror" "github.com/sirupsen/logrus" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" ) // New constructs a new driver func New(params *driverParameters) (storagedriver.StorageDriver, error) { rootDirectory := strings.Trim(params.rootDirectory, "/") if rootDirectory != "" { rootDirectory += "/" } if params.chunkSize <= 0 || params.chunkSize%minChunkSize != 0 { return nil, fmt.Errorf("invalid chunksize: %d is not a positive multiple of %d", params.chunkSize, minChunkSize) } d := &driver{ bucket: params.bucket, rootDirectory: rootDirectory, email: params.email, privateKey: params.privateKey, client: params.client, storageClient: params.storageClient, chunkSize: params.chunkSize, parallelWalk: params.parallelWalk, } return &Wrapper{ baseEmbed: baseEmbed{ Base: base.Base{ StorageDriver: base.NewRegulator(d, params.maxConcurrency), }, }, }, nil } // driver is a storagedriver.StorageDriver implementation backed by GCS // Objects are stored at absolute keys in the provided bucket. type driver struct { client *http.Client storageClient *storage.Client bucket string email string privateKey []byte rootDirectory string chunkSize int64 parallelWalk bool } func (*driver) Name() string { return driverName } // GetContent retrieves the content stored at "path" as a []byte. // This should primarily be used for small objects. func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { name := d.pathToKey(path) var rc io.ReadCloser err := retry(func() error { var err error rc, err = d.storageClient.Bucket(d.bucket).Object(name).NewReader(ctx) return err }) if errors.Is(err, storage.ErrObjectNotExist) { return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName} } if err != nil { return nil, err } defer rc.Close() p, err := io.ReadAll(rc) if err != nil { return nil, err } return p, nil } // PutContent stores the []byte content at a location designated by "path". // This should primarily be used for small objects. func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { return retry(func() error { wc := d.storageClient.Bucket(d.bucket).Object(d.pathToKey(path)).NewWriter(ctx) wc.ContentType = "application/octet-stream" // nolint: gosec h := md5.New() _, err := h.Write(contents) if err != nil { return fmt.Errorf("calculating hash: %w", err) } wc.MD5 = h.Sum(nil) if len(contents) == 0 { logrus.WithFields(logrus.Fields{ "path": path, "length": len(contents), "stack": string(debug.Stack()), }).Info("PutContent called with 0 bytes") } return putContentsClose(wc, contents) }) } // Reader retrieves an io.ReadCloser for the content stored at "path" // with a given byte offset. // May be used to resume reading a stream by providing a nonzero offset. func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { res, err := getObject(d.client, d.bucket, d.pathToKey(path), offset) if err != nil { var gcsErr *googleapi.Error if !errors.As(err, &gcsErr) { return nil, err } if gcsErr.Code == http.StatusNotFound { return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName} } if gcsErr.Code == http.StatusRequestedRangeNotSatisfiable { obj, err := storageStatObject(ctx, d.storageClient, d.bucket, d.pathToKey(path)) if err != nil { return nil, err } if offset == obj.Size { return io.NopCloser(bytes.NewReader(make([]byte, 0))), nil } return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset, DriverName: driverName} } } if res.Header.Get("Content-Type") == uploadSessionContentType { defer res.Body.Close() return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName} } return res.Body, nil } func storageDeleteObject(ctx context.Context, client *storage.Client, bucket, name string) error { return retry(func() error { return client.Bucket(bucket).Object(name).Delete(ctx) }) } func storageStatObject(ctx context.Context, client *storage.Client, bucket, name string) (*storage.ObjectAttrs, error) { var obj *storage.ObjectAttrs err := retry(func() error { var err error obj, err = client.Bucket(bucket).Object(name).Attrs(ctx) return err }) return obj, err } func storageListObjects(ctx context.Context, client *storage.Client, bucket string, q *storage.Query) (*storage.ObjectIterator, error) { var objs *storage.ObjectIterator err := retry(func() error { var err error objs = client.Bucket(bucket).Objects(ctx, q) return err }) return objs, err } func storageCopyObject(ctx context.Context, client *storage.Client, srcBucket, srcName, destBucket, destName string) (*storage.ObjectAttrs, error) { var obj *storage.ObjectAttrs err := retry(func() error { var err error src := client.Bucket(srcBucket).Object(srcName) dst := client.Bucket(destBucket).Object(destName) obj, err = dst.CopierFrom(src).Run(ctx) return err }) return obj, err } func getObject(client *http.Client, bucket, name string, offset int64) (*http.Response, error) { // copied from google.golang.org/cloud/storage#NewReader : // to set the additional "Range" header u := &url.URL{ Scheme: "https", Host: "storage.googleapis.com", Path: fmt.Sprintf("/%s/%s", bucket, name), } req, err := http.NewRequest(http.MethodGet, u.String(), nil) if err != nil { return nil, fmt.Errorf("creating new http request: %w", err) } if offset > 0 { req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset)) } var res *http.Response err = retry(func() error { var err error // nolint: bodyclose // body is closed bit later in the code res, err = client.Do(req) return err }) if err != nil { return nil, fmt.Errorf("executing http request: %w", err) } if err := googleapi.CheckMediaResponse(res); err != nil { _ = res.Body.Close() return nil, fmt.Errorf("checking media response: %w", err) } return res, nil } func putContentsClose(wc *storage.Writer, contents []byte) error { size := len(contents) var nn int var err error for nn < size { n, err := wc.Write(contents[nn:size]) nn += n if err != nil { break } } if err != nil { // nolint: staticcheck,gosec // this needs some refactoring and a deeper research wc.CloseWithError(err) return err } return wc.Close() } func putChunk(client *http.Client, sessionURI string, chunk []byte, from, totalSize int64) (int64, error) { bytesPut := int64(0) err := retry(func() error { req, err := http.NewRequest(http.MethodPut, sessionURI, bytes.NewReader(chunk)) if err != nil { return err } length := int64(len(chunk)) to := from + length - 1 size := "*" if totalSize >= 0 { size = strconv.FormatInt(totalSize, 10) } req.Header.Set("Content-Type", "application/octet-stream") if from == to+1 { req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", size)) } else { req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", from, to, size)) } req.Header.Set("Content-Length", strconv.FormatInt(length, 10)) resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() if totalSize < 0 && resp.StatusCode == 308 { groups := rangeHeader.FindStringSubmatch(resp.Header.Get("Range")) end, err := strconv.ParseInt(groups[2], 10, 64) if err != nil { return err } bytesPut = end - from + 1 return nil } err = googleapi.CheckMediaResponse(resp) if err != nil { return err } bytesPut = to - from + 1 return nil }) return bytesPut, err } // Writer returns a FileWriter which will store the content written to it // at the location designated by "path" after the call to Commit. func (d *driver) Writer(_ context.Context, path string, doAppend bool) (storagedriver.FileWriter, error) { writer := &writer{ client: d.client, storageClient: d.storageClient, bucket: d.bucket, name: d.pathToKey(path), buffer: make([]byte, d.chunkSize), } if doAppend { err := writer.init(path) if err != nil { return nil, err } } return writer, nil } // Stat retrieves the FileInfo for the given path, including the current // size in bytes and the creation time. func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { var fi storagedriver.FileInfoFields // try to get as file obj, err := storageStatObject(ctx, d.storageClient, d.bucket, d.pathToKey(path)) if err == nil { if obj.ContentType == uploadSessionContentType { return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName} } fi = storagedriver.FileInfoFields{ Path: path, Size: obj.Size, ModTime: obj.Updated, IsDir: false, } return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil } // try to get as folder dirpath := d.pathToDirKey(path) query := &storage.Query{} query.Prefix = dirpath it, err := storageListObjects(ctx, d.storageClient, d.bucket, query) if err != nil { return nil, err } attrs, err := it.Next() if err == iterator.Done { return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName} } if err != nil { return nil, err } fi = storagedriver.FileInfoFields{ Path: path, IsDir: true, } if attrs.Name == dirpath { fi.Size = attrs.Size fi.ModTime = attrs.Updated } return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil } // List returns a list of the objects that are direct descendants of the // given path. func (d *driver) List(ctx context.Context, path string) ([]string, error) { query := &storage.Query{} query.Delimiter = "/" query.Prefix = d.pathToDirKey(path) if query.Prefix == "/" { query.Prefix = "" } list := make([]string, 0, 64) it, err := storageListObjects(ctx, d.storageClient, d.bucket, query) if err != nil { return nil, err } for { attrs, err := it.Next() if err == iterator.Done { break } if err != nil { return nil, err } // GCS does not guarantee strong consistency between // DELETE and LIST operations. Check that the object is not deleted, // and filter out any objects with a non-zero time-deleted if attrs.Deleted.IsZero() && attrs.ContentType != uploadSessionContentType { var name string if len(attrs.Prefix) > 0 { name = attrs.Prefix } else { name = attrs.Name } list = append(list, d.keyToPath(name)) } } if path != "/" && len(list) == 0 { // Treat empty response as missing directory, since we don't actually // have directories in Google Cloud Storage. return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName} } return list, nil } // Move moves an object stored at sourcePath to destPath, removing the // original object. func (d *driver) Move(ctx context.Context, sourcePath, destPath string) error { _, err := storageCopyObject(ctx, d.storageClient, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath)) if err != nil { var gerr *googleapi.Error if errors.As(err, &gerr) { if gerr.Code == http.StatusNotFound { return storagedriver.PathNotFoundError{Path: sourcePath, DriverName: driverName} } } return err } err = storageDeleteObject(ctx, d.storageClient, d.bucket, d.pathToKey(sourcePath)) // if deleting the file fails, log the error, but do not fail; the file was successfully copied, // and the original should eventually be cleaned when purging the uploads folder. if err != nil { logrus.Infof("error deleting file: %v due to %v", sourcePath, err) } return nil } // listAll recursively lists all names of objects stored at "prefix" and its subpaths. func (d *driver) listAll(ctx context.Context, prefix string) ([]string, error) { list := make([]string, 0, 64) query := &storage.Query{} query.Prefix = prefix query.Versions = false it, err := storageListObjects(ctx, d.storageClient, d.bucket, query) if err != nil { return nil, err } for { attrs, err := it.Next() if err == iterator.Done { break } if err != nil { return nil, err } // GCS does not guarantee strong consistency between // DELETE and LIST operations. Check that the object is not deleted, // and filter out any objects with a non-zero time-deleted if attrs.Deleted.IsZero() { list = append(list, attrs.Name) } } return list, nil } // Delete recursively deletes all objects stored at "path" and its subpaths. func (d *driver) Delete(ctx context.Context, path string) error { prefix := d.pathToDirKey(path) keys, err := d.listAll(ctx, prefix) if err != nil { return err } if len(keys) > 0 { sort.Sort(sort.Reverse(sort.StringSlice(keys))) for _, key := range keys { err := storageDeleteObject(ctx, d.storageClient, d.bucket, key) // GCS only guarantees eventual consistency, so listAll might return // paths that no longer exist. If this happens, just ignore any not // found error if errors.Is(err, storage.ErrObjectNotExist) { err = nil } if err != nil { return err } } return nil } err = storageDeleteObject(ctx, d.storageClient, d.bucket, d.pathToKey(path)) if errors.Is(err, storage.ErrObjectNotExist) { return storagedriver.PathNotFoundError{Path: path, DriverName: driverName} } return err } // DeleteFiles deletes a set of files concurrently, using a separate goroutine for each, up to a maximum of // maxDeleteConcurrency. Returns the number of successfully deleted files and any errors. This method is idempotent, no // error is returned if a file does not exist. func (d *driver) DeleteFiles(ctx context.Context, paths []string) (int, error) { // collect errors from concurrent requests var errs error errCh := make(chan error) errDone := make(chan struct{}) go func() { for err := range errCh { if err != nil { errs = multierror.Append(errs, err) } } errDone <- struct{}{} }() // count the number of successfully deleted files across concurrent requests count := 0 countCh := make(chan struct{}) countDone := make(chan struct{}) go func() { for range countCh { count++ } countDone <- struct{}{} }() var wg sync.WaitGroup // limit the number of active goroutines to maxDeleteConcurrency semaphore := make(chan struct{}, maxDeleteConcurrency) for _, path := range paths { // block if there are maxDeleteConcurrency goroutines semaphore <- struct{}{} wg.Add(1) go func(p string) { defer func() { wg.Done() // signal free spot for another goroutine <-semaphore }() if err := storageDeleteObject(ctx, d.storageClient, d.bucket, d.pathToKey(p)); err != nil { if !errors.Is(err, storage.ErrObjectNotExist) { errCh <- err return } } // count successfully deleted files countCh <- struct{}{} }(path) } wg.Wait() close(semaphore) close(errCh) <-errDone close(countCh) <-countDone return count, errs } // URLFor returns a URL which may be used to retrieve the content stored at // the given path, possibly using the given options. // Returns ErrUnsupportedMethod if this driver has no privateKey func (d *driver) URLFor(_ context.Context, path string, options map[string]any) (string, error) { if d.privateKey == nil { return "", storagedriver.ErrUnsupportedMethod{DriverName: driverName} } name := d.pathToKey(path) methodString := http.MethodGet method, ok := options["method"] if ok { methodString, ok = method.(string) if !ok || (methodString != http.MethodGet && methodString != http.MethodHead) { return "", storagedriver.ErrUnsupportedMethod{DriverName: driverName} } } expiresTime := systemClock.Now().Add(20 * time.Minute) expires, ok := options["expiry"] if ok { et, ok := expires.(time.Time) if ok { expiresTime = et } } opts := &storage.SignedURLOptions{ GoogleAccessID: d.email, PrivateKey: d.privateKey, Method: methodString, Expires: expiresTime, QueryParameters: storagedriver.CustomParams(options, customParamKeys), Scheme: storage.SigningSchemeV4, } return storage.SignedURL(d.bucket, name, opts) } // Walk traverses a filesystem defined within driver, starting // from the given path, calling f on each file func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { return storagedriver.WalkFallback(ctx, d, path, f) } // WalkParallel traverses a filesystem defined within driver in parallel, starting // from the given path, calling f on each file. func (d *driver) WalkParallel(ctx context.Context, path string, f storagedriver.WalkFn) error { // If the ParallelWalk feature flag is not set, fall back to standard sequential walk. if !d.parallelWalk { return d.Walk(ctx, path, f) } return storagedriver.WalkFallbackParallel(ctx, d, maxWalkConcurrency, path, f) } func startSession(client *http.Client, bucket, name string) (uri string, err error) { u := &url.URL{ Scheme: "https", Host: "www.googleapis.com", Path: fmt.Sprintf("/upload/storage/v1/b/%v/o", bucket), RawQuery: fmt.Sprintf("uploadType=resumable&name=%v", name), } err = retry(func() error { req, err := http.NewRequest(http.MethodPost, u.String(), nil) if err != nil { return err } req.Header.Set("X-Upload-Content-Type", "application/octet-stream") req.Header.Set("Content-Length", "0") resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() err = googleapi.CheckMediaResponse(resp) if err != nil { return err } uri = resp.Header.Get("Location") return nil }) return uri, err } func (d *driver) pathToKey(path string) string { return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/")) } func (d *driver) pathToDirKey(path string) string { return d.pathToKey(path) + "/" } func (d *driver) keyToPath(key string) string { return "/" + strings.Trim(strings.TrimPrefix(key, d.rootDirectory), "/") } type writer struct { client *http.Client storageClient *storage.Client bucket string name string size int64 offset int64 closed bool committed bool canceled bool sessionURI string buffer []byte buffSize int64 } // Cancel removes any written content from this FileWriter. func (w *writer) Cancel() error { if w.closed { return storagedriver.ErrAlreadyClosed } else if w.committed { return storagedriver.ErrAlreadyCommited } w.canceled = true err := storageDeleteObject(context.Background(), w.storageClient, w.bucket, w.name) if err != nil { if errors.Is(err, storage.ErrObjectNotExist) { return nil } var gerr *googleapi.Error if errors.As(err, &gerr) && gerr.Code == http.StatusNotFound { return nil } return fmt.Errorf("deleting object while canceling writer: %w", err) } return nil } func (w *writer) Close() error { if w.closed { return storagedriver.ErrAlreadyClosed } w.closed = true if w.canceled { // NOTE(prozlach): If the writer has already been canceled, then there // is nothing to flush to the backend as the target file has already // been deleted. return nil } if w.committed { // we are done already, return early return nil } err := w.writeChunk() if err != nil { return fmt.Errorf("writing chunk: %w", err) } // Copy the remaining bytes from the buffer to the upload session // Normally buffSize will be smaller than minChunkSize. However, in the // unlikely event that the upload session failed to start, this number could be higher. // In this case we can safely clip the remaining bytes to the minChunkSize if w.buffSize > minChunkSize { w.buffSize = minChunkSize } // commit the writes by updating the upload session err = retry(func() error { context := context.Background() wc := w.storageClient.Bucket(w.bucket).Object(w.name).NewWriter(context) wc.ContentType = uploadSessionContentType wc.Metadata = map[string]string{ "Session-URI": w.sessionURI, "Offset": strconv.FormatInt(w.offset, 10), } return putContentsClose(wc, w.buffer[0:w.buffSize]) }) if err != nil { return fmt.Errorf("writing contents while closing writer: %w", err) } w.size = w.offset + w.buffSize w.buffSize = 0 return nil } // Commit flushes all content written to this FileWriter and makes it // available for future calls to StorageDriver.GetContent and // StorageDriver.Reader. func (w *writer) Commit() error { switch { case w.closed: return storagedriver.ErrAlreadyClosed case w.committed: return storagedriver.ErrAlreadyCommited case w.canceled: return storagedriver.ErrAlreadyCanceled } w.committed = true // no session started yet just perform a simple upload if w.sessionURI == "" { err := retry(func() error { context := context.Background() wc := w.storageClient.Bucket(w.bucket).Object(w.name).NewWriter(context) wc.ContentType = "application/octet-stream" return putContentsClose(wc, w.buffer[0:w.buffSize]) }) if err != nil { return err } w.size = w.offset + w.buffSize w.buffSize = 0 return nil } size := w.offset + w.buffSize var nn int64 // loop must be performed at least once to ensure the file is committed even when // the buffer is empty for { n, err := putChunk(w.client, w.sessionURI, w.buffer[nn:w.buffSize], w.offset, size) nn += n w.offset += n w.size = w.offset if err != nil { w.buffSize = int64(copy(w.buffer, w.buffer[nn:w.buffSize])) // nolint: gosec // copy() is always going to be non-negative return err } if nn == w.buffSize { break } } w.buffSize = 0 return nil } func (w *writer) writeChunk() error { var err error // chunks can be uploaded only in multiples of minChunkSize // chunkSize is a multiple of minChunkSize less than or equal to buffSize chunkSize := w.buffSize - (w.buffSize % minChunkSize) if chunkSize == 0 { return nil } // if their is no sessionURI yet, obtain one by starting the session if w.sessionURI == "" { w.sessionURI, err = startSession(w.client, w.bucket, w.name) } if err != nil { return err } nn, err := putChunk(w.client, w.sessionURI, w.buffer[0:chunkSize], w.offset, -1) w.offset += nn if w.offset > w.size { w.size = w.offset } // shift the remaining bytes to the start of the buffer w.buffSize = int64(copy(w.buffer, w.buffer[nn:w.buffSize])) // nolint: gosec // copy() is always going to be non-negative return err } func (w *writer) Write(p []byte) (int, error) { switch { case w.closed: return 0, storagedriver.ErrAlreadyClosed case w.committed: return 0, storagedriver.ErrAlreadyCommited case w.canceled: return 0, storagedriver.ErrAlreadyCanceled } var err error var nn int for nn < len(p) { n := copy(w.buffer[w.buffSize:], p[nn:]) w.buffSize += int64(n) // nolint: gosec // copy() is always going to be non-negative if w.buffSize == int64(cap(w.buffer)) { err = w.writeChunk() if err != nil { break } } nn += n } return nn, err } // Size returns the number of bytes written to this FileWriter. func (w *writer) Size() int64 { return w.size } func (w *writer) init(path string) error { res, err := getObject(w.client, w.bucket, w.name, 0) if err != nil { var gcsErr *googleapi.Error if errors.As(err, &gcsErr) && gcsErr.Code == http.StatusNotFound { return storagedriver.PathNotFoundError{Path: path, DriverName: driverName} } return err } defer res.Body.Close() if res.Header.Get("Content-Type") != uploadSessionContentType { return storagedriver.PathNotFoundError{Path: path, DriverName: driverName} } offset, err := strconv.ParseInt(res.Header.Get("X-Goog-Meta-Offset"), 10, 64) if err != nil { return err } buffer, err := io.ReadAll(res.Body) if err != nil { return err } w.sessionURI = res.Header.Get("X-Goog-Meta-Session-URI") w.buffSize = int64(copy(w.buffer, buffer)) // nolint: gosec // copy() is always going to be non-negative w.offset = offset w.size = offset + w.buffSize return nil }