registry/storage/driver/gcs/gcs_next.go (671 lines of code) (raw):
// Package gcs provides a storagedriver.StorageDriver implementation to
// store blobs in Google cloud storage.
//
// This package leverages the google.golang.org/cloud/storage client library
// for interfacing with gcs.
//
// Because gcs is a key, value store the Stat call does not support last modification
// time for directories (directories are an abstraction for key, value stores)
//
// Note that the contents of incomplete uploads are not accessible even though
// Stat returns their length
package gcs
import (
"bytes"
"context"
"sync"
"sync/atomic"
// nolint: revive,gosec // imports-blocklist
"crypto/md5"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"runtime/debug"
"strconv"
"strings"
"time"
"cloud.google.com/go/storage"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/hashicorp/go-multierror"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
)
// NewNext constructs a new driver
func NewNext(params *driverParameters) (storagedriver.StorageDriver, error) {
rootDirectory := strings.Trim(params.rootDirectory, "/")
if rootDirectory != "" {
rootDirectory += "/"
}
if params.chunkSize <= 0 || params.chunkSize%minChunkSize != 0 {
return nil, fmt.Errorf("invalid chunksize: %d is not a positive multiple of %d", params.chunkSize, minChunkSize)
}
d := &driverNext{
bucket: params.storageClient.Bucket(params.bucket),
rootDirectory: rootDirectory,
email: params.email,
privateKey: params.privateKey,
client: params.client,
chunkSize: params.chunkSize,
parallelWalk: params.parallelWalk,
}
return &Wrapper{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: base.NewRegulator(d, params.maxConcurrency),
},
},
}, nil
}
// driverNext is a storagedriver.StorageDriver implementation backed by GCS
// Objects are stored at absolute keys in the provided bucket.
type driverNext struct {
client *http.Client
bucket *storage.BucketHandle
email string
privateKey []byte
rootDirectory string
chunkSize int64
parallelWalk bool
}
func (*driverNext) Name() string {
return driverName
}
// GetContent retrieves the content stored at "path" as a []byte.
// This should primarily be used for small objects.
func (d *driverNext) GetContent(ctx context.Context, path string) ([]byte, error) {
name := d.pathToKey(path)
var rc io.ReadCloser
err := retry(func() error {
var err error
rc, err = d.bucket.Object(name).NewReader(ctx)
return err
})
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
if err != nil {
return nil, fmt.Errorf("creating new reader: %w", err)
}
defer rc.Close()
p, err := io.ReadAll(rc)
if err != nil {
return nil, fmt.Errorf("reading data: %w", err)
}
return p, nil
}
// PutContent stores the []byte content at a location designated by "path".
// This should primarily be used for small objects.
func (d *driverNext) PutContent(ctx context.Context, path string, contents []byte) error {
return retry(func() error {
wc := d.bucket.Object(d.pathToKey(path)).NewWriter(ctx)
wc.ContentType = "application/octet-stream"
// NOTE(milosgajdos/prozlach): Apparently it's posisble to to upload
// 0-byte content to GCS. Setting MD5 on the Writer helps to prevent
// presisting that data. If set, the uploaded data is rejected if its
// MD5 hash does not match this field. See:
// https://pkg.go.dev/cloud.google.com/go/storage#ObjectAttrs
// nolint: gosec
h := md5.New()
_, err := h.Write(contents)
if err != nil {
return fmt.Errorf("calculating hash: %w", err)
}
wc.MD5 = h.Sum(nil)
if len(contents) == 0 {
logrus.WithFields(logrus.Fields{
"path": path,
"length": len(contents),
"stack": string(debug.Stack()),
}).Info("PutContent called with 0 bytes")
}
return putContentsCloseNext(wc, contents)
})
}
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset. May be used to resume reading a stream by providing a
// nonzero offset.
func (d *driverNext) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
obj := d.bucket.Object(d.pathToKey(path))
// NOTE(milosgajdos/prozlach): If length is negative, the object is read
// until the end. The request is retried until the context is canceled as
// the operation is idempotent. See links below for more info:
// https://pkg.go.dev/cloud.google.com/go/storage#ObjectHandle.NewRangeReader
// https://cloud.google.com/storage/docs/retry-strategy
r, err := obj.NewRangeReader(ctx, offset, -1)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
var status *googleapi.Error
if errors.As(err, &status) {
// nolint: revive // max-control-nesting
switch status.Code {
case http.StatusNotFound:
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
case http.StatusRequestedRangeNotSatisfiable:
attrs, err := obj.Attrs(ctx)
if err != nil {
return nil, fmt.Errorf("fetching object attributes: %w", err)
}
if offset == attrs.Size {
return io.NopCloser(bytes.NewReader(make([]byte, 0))), nil
}
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset, DriverName: driverName}
}
}
return nil, err
}
if r.Attrs.ContentType == uploadSessionContentType {
_ = r.Close()
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
return r, nil
}
func storageDeleteObjectNext(ctx context.Context, bucket *storage.BucketHandle, name string) error {
return retry(func() error {
return bucket.Object(name).Delete(ctx)
})
}
func storageStatObjectNext(ctx context.Context, bucket *storage.BucketHandle, name string) (*storage.ObjectAttrs, error) {
var obj *storage.ObjectAttrs
err := retry(func() error {
var err error
obj, err = bucket.Object(name).Attrs(ctx)
return err
})
return obj, err
}
func storageListObjectsNext(ctx context.Context, bucket *storage.BucketHandle, q *storage.Query) (*storage.ObjectIterator, error) {
var objs *storage.ObjectIterator
err := retry(func() error {
var err error
objs = bucket.Objects(ctx, q)
return err
})
return objs, err
}
func storageCopyObjectNext(ctx context.Context, bucket *storage.BucketHandle, srcName, destName string) (*storage.ObjectAttrs, error) {
var obj *storage.ObjectAttrs
err := retry(func() error {
var err error
src := bucket.Object(srcName)
dst := bucket.Object(destName)
obj, err = dst.CopierFrom(src).Run(ctx)
return err
})
return obj, err
}
func putContentsCloseNext(wc *storage.Writer, contents []byte) error {
size := len(contents)
var nn int
var err error
for nn < size {
n, err := wc.Write(contents[nn:size])
nn += n
if err != nil {
break
}
}
if err != nil {
// nolint: staticcheck,gosec // this needs some refactoring and a deeper research
wc.CloseWithError(err)
return err
}
return wc.Close()
}
// putChunkNext either completes upload or submits another chunk.
func putChunkNext(ctx context.Context, client *http.Client, sessionURI string, chunk []byte, from, totalSize int64) (int64, error) {
// Documentation: https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
bytesPut := int64(0)
err := retry(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodPut, sessionURI, bytes.NewReader(chunk))
if err != nil {
return fmt.Errorf("creating new http request: %w", err)
}
length := int64(len(chunk))
to := from + length - 1
req.Header.Set("Content-Type", "application/octet-stream")
// NOTE(prozlach) fake-gcs-server documents this behavior well:
// https://github.com/fsouza/fake-gcs-server/blob/1e954726309326217b4c533bb9646e30f683fa66/fakestorage/upload.go#L467-L501
//
// A resumable upload is sent in one or more chunks. The request's
// "Content-Range" header is used to determine if more data is expected.
//
// When sending streaming content, the total size is unknown until the stream
// is exhausted. The Go client always sends streaming content. The sequence of
// "Content-Range" headers for 2600-byte content sent in 1000-byte chunks are:
//
// Content-Range: bytes 0-999/*
// Content-Range: bytes 1000-1999/*
// Content-Range: bytes 2000-2599/*
// Content-Range: bytes */2600
//
// When sending chunked content of a known size, the total size is sent as
// well. The Python client uses this method to upload files and in-memory
// content. The sequence of "Content-Range" headers for the 2600-byte content
// sent in 1000-byte chunks are:
//
// Content-Range: bytes 0-999/2600
// Content-Range: bytes 1000-1999/2600
// Content-Range: bytes 2000-2599/2600
//
// The server collects the content, analyzes the "Content-Range", and returns a
// "308 Permanent Redirect" response if more chunks are expected, and a
// "200 OK" response if the upload is complete (the Go client also accepts a
// "201 Created" response). The "Range" header in the response should be set to
// the size of the content received so far, such as:
//
// Range: bytes 0-2000
//
// The client (such as the Go client) can send a header "X-Guploader-No-308" if
// it can't process a native "308 Permanent Redirect". The in-process response
// then has a status of "200 OK", with a header "X-Http-Status-Code-Override"
// set to "308".
size := "*"
if totalSize >= 0 {
size = strconv.FormatInt(totalSize, 10)
}
if from == to+1 {
req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", size))
} else {
req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", from, to, size))
}
req.Header.Set("Content-Length", strconv.FormatInt(length, 10))
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("executing http request: %w", err)
}
defer resp.Body.Close()
if totalSize < 0 && resp.StatusCode == 308 {
// NOTE(prozlach): Quoting google docs:
// https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
//
// A 200 OK or 201 Created response indicates that the upload was
// completed, and no further action is necessary
//
// A 308 Resume Incomplete response indicates resumable upload is
// in progress. If Cloud Storage has not yet persisted any bytes,
// the 308 response does not have a Range header. In this case, we
// should start upload from the beginning. Otherwise, the 308
// response has a Range header, which specifies which bytes Cloud
// Storage has persisted so far.
groups := rangeHeader.FindStringSubmatch(resp.Header.Get("Range"))
end, err := strconv.ParseInt(groups[2], 10, 64)
if err != nil {
return fmt.Errorf("parsing Range header: %w", err)
}
bytesPut = end - from + 1
return nil
}
err = googleapi.CheckMediaResponse(resp)
if err != nil {
return fmt.Errorf("committing resumable upload: %w", err)
}
bytesPut = to - from + 1
return nil
})
return bytesPut, err
}
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func (d *driverNext) Writer(ctx context.Context, path string, doAppend bool) (storagedriver.FileWriter, error) {
w := &writerNext{
ctx: ctx,
client: d.client,
object: d.bucket.Object(d.pathToKey(path)),
buffer: make([]byte, d.chunkSize),
}
// NOTE(prozlach): Dear future maintainer of this code, the concurency
// model that GCS has has some severe limitations.
//
// In case when we create a new Writer, a new session URL is obtained,
// which in turn creates new resumable upload. GCS has last-write-wins
// semantic, so in case when multiple Writers are targeting the same path,
// the last write commit silently wins. Not ideal, but at least there will
// not be data corruption.
// The problem starts when there is more than one writer resuming the same
// upload - they will read back the same session URL from the temporary
// object, and hence try to append to the same in-progress file upload. GCS
// ignores data for the offsets that it already committed, so the outcomes
// aren't predicable and there is going to be data corruption.
// Fortunately there is no need to fix this ATM, as the way that
// container-registry creates paths makes collisions impossible(?),
// and the filesystem driver is not that consistent too. Only Azure v2
// and S3 v2 are ATM.
// Fixing it would require some non-trivial refactoring, let's wait until
// we have a use-case which would justify it.
if doAppend {
err := w.init()
if err != nil {
return nil, err
}
}
return w, nil
}
// Stat retrieves the FileInfo for the given path, including the current
// size in bytes and the creation time.
func (d *driverNext) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
var fi storagedriver.FileInfoFields
// try to get as file
obj, err := storageStatObjectNext(ctx, d.bucket, d.pathToKey(path))
if err == nil {
if obj.ContentType == uploadSessionContentType {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
fi = storagedriver.FileInfoFields{
Path: path,
Size: obj.Size,
ModTime: obj.Updated,
IsDir: false,
}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}
// try to get as folder
dirpath := d.pathToDirKey(path)
query := &storage.Query{}
query.Prefix = dirpath
it, err := storageListObjectsNext(ctx, d.bucket, query)
if err != nil {
return nil, fmt.Errorf("listing objects: %w", err)
}
attrs, err := it.Next()
if err == iterator.Done {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
if err != nil {
return nil, fmt.Errorf("fetching next page from iterator: %w", err)
}
fi = storagedriver.FileInfoFields{
Path: path,
IsDir: true,
}
if attrs.Name == dirpath {
fi.Size = attrs.Size
fi.ModTime = attrs.Updated
}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}
// List returns a list of the objects that are direct descendants of the
// given path.
func (d *driverNext) List(ctx context.Context, path string) ([]string, error) {
query := &storage.Query{}
query.Delimiter = "/"
query.Prefix = d.pathToDirKey(path)
if query.Prefix == "/" {
query.Prefix = ""
}
list := make([]string, 0, 64)
it, err := storageListObjectsNext(ctx, d.bucket, query)
if err != nil {
return nil, fmt.Errorf("listing objects: %w", err)
}
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, fmt.Errorf("fetching next page from iterator: %w", err)
}
// GCS does not guarantee strong consistency between
// DELETE and LIST operations. Check that the object is not deleted,
// and filter out any objects with a non-zero time-deleted
if attrs.Deleted.IsZero() && attrs.ContentType != uploadSessionContentType {
var name string
if len(attrs.Prefix) > 0 {
name = attrs.Prefix
} else {
name = attrs.Name
}
list = append(list, d.keyToPath(name))
}
}
if path != "/" && len(list) == 0 {
// Treat empty response as missing directory, since we don't actually
// have directories in Google Cloud Storage.
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
return list, nil
}
// Move moves an object stored at sourcePath to destPath, removing the
// original object.
func (d *driverNext) Move(ctx context.Context, sourcePath, destPath string) error {
_, err := storageCopyObjectNext(ctx, d.bucket, d.pathToKey(sourcePath), d.pathToKey(destPath))
if err != nil {
var gerr *googleapi.Error
if errors.As(err, &gerr) {
if gerr.Code == http.StatusNotFound {
return storagedriver.PathNotFoundError{Path: sourcePath, DriverName: driverName}
}
}
return err
}
err = storageDeleteObjectNext(ctx, d.bucket, d.pathToKey(sourcePath))
// if deleting the file fails, log the error, but do not fail; the file was successfully copied,
// and the original should eventually be cleaned when purging the uploads folder.
if err != nil {
logrus.Infof("error deleting file: %v due to %v", sourcePath, err)
}
return nil
}
// listAll recursively lists all names of objects stored at "prefix" and its subpaths.
func (d *driverNext) listAll(ctx context.Context, prefix string) ([]string, error) {
list := make([]string, 0, 64)
query := &storage.Query{}
query.Prefix = prefix
query.Versions = false
it, err := storageListObjectsNext(ctx, d.bucket, query)
if err != nil {
return nil, fmt.Errorf("listing objects: %w", err)
}
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, fmt.Errorf("fetching next page from iterator: %w", err)
}
// GCS does not guarantee strong consistency between
// DELETE and LIST operations. Check that the object is not deleted,
// and filter out any objects with a non-zero time-deleted
if attrs.Deleted.IsZero() {
list = append(list, attrs.Name)
}
}
return list, nil
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driverNext) Delete(ctx context.Context, path string) error {
prefix := d.pathToDirKey(path)
keys, err := d.listAll(ctx, prefix)
if err != nil {
return err
}
if len(keys) > 0 {
// NOTE(milosgajdos/prozlach): d.listAll calls (BucketHandle).Objects
// Objects will be iterated over lexicographically by name. This means
// we don't have to reverse order the slice; we can range over the keys
// slice in reverse order
// docs:
// https://pkg.go.dev/cloud.google.com/go/storage#BucketHandle.Objects
for i := len(keys) - 1; i >= 0; i-- {
err := storageDeleteObjectNext(ctx, d.bucket, keys[i])
// GCS only guarantees eventual consistency, so listAll might return
// paths that no longer exist. If this happens, just ignore any not
// found error
if err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
return fmt.Errorf("deleting object: %w", err)
}
}
return nil
}
err = storageDeleteObjectNext(ctx, d.bucket, d.pathToKey(path))
if errors.Is(err, storage.ErrObjectNotExist) {
return storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
return err
}
// DeleteFiles deletes a set of files concurrently, using a separate goroutine
// for each, up to a maximum of maxDeleteConcurrency. Returns the number of
// successfully deleted files and any errors. This method is idempotent, no
// error is returned if a file does not exist.
func (d *driverNext) DeleteFiles(ctx context.Context, paths []string) (int, error) {
errs := new(multierror.Error)
errsMutex := new(sync.Mutex)
// Count the number of successfully deleted files across concurrent requests
// NOTE(prozlach): Using int32, this gives us up to 2 147 483 648 files
// that can be deleted, so it should suffice. By using int64 we would get
// linter warnings, esp. on 32bit platforms.
count := new(atomic.Int32)
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(maxDeleteConcurrency)
for _, path := range paths {
g.Go(func() error {
// Check if any context was canceled, if so - skip calling Delete
// as it will fail anyway.
select {
case <-gctx.Done():
errsMutex.Lock()
defer errsMutex.Unlock()
errs = multierror.Append(errs, gctx.Err())
return nil
default:
}
err := storageDeleteObjectNext(gctx, d.bucket, d.pathToKey(path))
if err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
errsMutex.Lock()
defer errsMutex.Unlock()
errs = multierror.Append(errs, err)
} else {
// count successfully deleted files
count.Add(1)
}
return nil
})
}
// Wait for all goroutines. g.Wait() will return the first error returned
// by any g.Go func, or nil if all returned nil. Since we always return nil
// from g.Go in this example, we rely solely on the errors collected
// multierror var
_ = g.Wait()
return int(count.Load()), errs.ErrorOrNil()
}
// URLFor returns a URL which may be used to retrieve the content stored at
// the given path, possibly using the given options.
// Returns ErrUnsupportedMethod if this driver has no privateKey
func (d *driverNext) URLFor(_ context.Context, path string, options map[string]any) (string, error) {
name := d.pathToKey(path)
methodString := http.MethodGet
method, ok := options["method"]
if ok {
methodString, ok = method.(string)
if !ok || (methodString != http.MethodGet && methodString != http.MethodHead) {
return "", storagedriver.ErrUnsupportedMethod{DriverName: driverName}
}
}
expiresTime := systemClock.Now().Add(20 * time.Minute)
expires, ok := options["expiry"]
if ok {
et, ok := expires.(time.Time)
if ok {
expiresTime = et
}
}
opts := &storage.SignedURLOptions{
Method: methodString,
Expires: expiresTime,
QueryParameters: storagedriver.CustomParams(options, customParamKeys),
Scheme: storage.SigningSchemeV4,
}
if d.privateKey != nil && d.email != "" {
// If we have a private key and email from service account JSON, use them directly
opts.GoogleAccessID = d.email
opts.PrivateKey = d.privateKey
}
// Signing a URL requires credentials authorized to sign a URL. They can be
// passed through SignedURLOptions with one of the following options:
// a. a Google service account private key, obtainable from the Google Developers Console
// b. a Google Access ID with iam.serviceAccounts.signBlob permissions
// c. a SignBytes function implementing custom signing.
// In this case none of these options are used, which means the SignedURL
// function attempts to use the same authentication that was used to
// instantiate the Storage client and in our case this is instance profile
// credentials.
// Doc: https://cloud.google.com/storage/docs/access-control/signing-urls-with-helpers#download-object
return d.bucket.SignedURL(name, opts)
}
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driverNext) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
// WalkParallel traverses a filesystem defined within driver in parallel, starting
// from the given path, calling f on each file.
func (d *driverNext) WalkParallel(ctx context.Context, path string, f storagedriver.WalkFn) error {
// If the ParallelWalk feature flag is not set, fall back to standard sequential walk.
if !d.parallelWalk {
return d.Walk(ctx, path, f)
}
return storagedriver.WalkFallbackParallel(ctx, d, maxWalkConcurrency, path, f)
}
// startSessionNext starts a new resumable upload session. Documentation can be
// found here: https://cloud.google.com/storage/docs/performing-resumable-uploads#initiate-session
func startSessionNext(ctx context.Context, client *http.Client, bucket, name string) (uri string, err error) {
u := &url.URL{
Scheme: "https",
// https://cloud.google.com/storage/docs/request-endpoints#typical
Host: "storage.googleapis.com",
Path: fmt.Sprintf("/upload/storage/v1/b/%v/o", bucket),
RawQuery: fmt.Sprintf("uploadType=resumable&name=%v", name),
}
err = retry(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil)
if err != nil {
return err
}
req.Header.Set("X-Upload-Content-Type", "application/octet-stream")
req.Header.Set("Content-Length", "0")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("executing http request: %w", err)
}
defer resp.Body.Close()
err = googleapi.CheckMediaResponse(resp)
if err != nil {
return fmt.Errorf("starting new resumable upload session: %w", err)
}
uri = resp.Header.Get("Location")
return nil
})
return uri, err
}
func (d *driverNext) pathToKey(path string) string {
return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/"))
}
func (d *driverNext) pathToDirKey(path string) string {
return d.pathToKey(path) + "/"
}
func (d *driverNext) keyToPath(key string) string {
return "/" + strings.Trim(strings.TrimPrefix(key, d.rootDirectory), "/")
}
type writerNext struct {
ctx context.Context
client *http.Client
object *storage.ObjectHandle
// size is the amount of data written to this writter using Write(), but
// not necessarily committed yet (i.e. offset + buffSize). If Writer was
// resumed, then it includes the offset of the data already persisted in
// the previous session.
size int64
// offset is the amount of data persisted in the backend, be it as
// resumable upload yet to be committed or data written in
// putContentsCloseNext. It does not mean that this data is visible in the
// bucket, as resumable uploads require finall call to Commit() to finish
// the upload and make the data accessible.
offset int64
closed bool
committed bool
canceled bool
sessionURI string
buffer []byte
buffSize int64
}
// Cancel removes any written content from this FileWriter.
func (w *writerNext) Cancel() error {
if w.closed {
return storagedriver.ErrAlreadyClosed
} else if w.committed {
return storagedriver.ErrAlreadyCommited
}
w.canceled = true
err := retry(func() error { return w.object.Delete(w.ctx) })
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil
}
var gerr *googleapi.Error
if errors.As(err, &gerr) && gerr.Code == http.StatusNotFound {
return nil
}
return fmt.Errorf("deleting object while canceling writer: %w", err)
}
return nil
}
// Close stores the current resumable upload session in a temporary object
// provided that the session has not been committed nor canceled yet.
func (w *writerNext) Close() error {
if w.closed {
return storagedriver.ErrAlreadyClosed
}
w.closed = true
if w.canceled {
// NOTE(prozlach): If the writer has already been canceled, then there
// is nothing to flush to the backend as the target file has already
// been deleted.
return nil
}
if w.committed {
// we are done already, return early
return nil
}
err := w.writeChunk()
if err != nil {
return fmt.Errorf("writing chunk: %w", err)
}
// Copy the remaining bytes from the buffer to the upload session Normally
// buffSize will be smaller than minChunkSize. However, in the unlikely
// event that the upload session failed to start, this number could be
// higher. In this case we can safely clip the remaining bytes to the
// minChunkSize
if w.buffSize > minChunkSize {
w.buffSize = minChunkSize
}
// commit the writes by updating the upload session
err = retry(func() error {
wc := w.object.NewWriter(w.ctx)
wc.ContentType = uploadSessionContentType
wc.Metadata = map[string]string{
"Session-URI": w.sessionURI,
"Offset": strconv.FormatInt(w.offset, 10),
}
return putContentsCloseNext(wc, w.buffer[0:w.buffSize])
})
if err != nil {
return fmt.Errorf("writing contents while closing writer: %w", err)
}
w.buffSize = 0
return nil
}
// Commit flushes all content written to this FileWriter and makes it
// available for future calls to StorageDriver.GetContent and
// StorageDriver.Reader.
func (w *writerNext) Commit() error {
switch {
case w.closed:
return storagedriver.ErrAlreadyClosed
case w.committed:
return storagedriver.ErrAlreadyCommited
case w.canceled:
return storagedriver.ErrAlreadyCanceled
}
w.committed = true
// no session started yet just perform a simple upload
if w.sessionURI == "" {
err := retry(func() error {
wc := w.object.NewWriter(w.ctx)
wc.ContentType = "application/octet-stream"
return putContentsCloseNext(wc, w.buffer[0:w.buffSize])
})
if err != nil {
return err
}
// putContentsCloseNext overwrites the object if it was already
// present, so offset is going to be equal to w.size.
w.offset = w.size
w.buffSize = 0
return nil
}
var nn int64
// NOTE(prozlach): loop must be performed at least once to ensure the file
// is committed even when the buffer is empty. The empty/first loop with
// zero-length will actually commit the file.
for {
n, err := putChunkNext(w.ctx, w.client, w.sessionURI, w.buffer[nn:w.buffSize], w.offset, w.size)
nn += n
w.offset += n
if err != nil {
w.buffSize = int64(copy(w.buffer, w.buffer[nn:w.buffSize])) // nolint: gosec // copy() is always going to be non-negative
return err
}
if nn == w.buffSize {
break
}
}
w.buffSize = 0
return nil
}
func (w *writerNext) writeChunk() error {
var err error
// chunks can be uploaded only in multiples of minChunkSize
// chunkSize is a multiple of minChunkSize less than or equal to buffSize
chunkSize := w.buffSize - (w.buffSize % minChunkSize)
if chunkSize == 0 {
return nil
}
// if their is no sessionURI yet, obtain one by starting the session
if w.sessionURI == "" {
w.sessionURI, err = startSessionNext(w.ctx, w.client, w.object.BucketName(), w.object.ObjectName())
}
if err != nil {
return err
}
nn, err := putChunkNext(w.ctx, w.client, w.sessionURI, w.buffer[0:chunkSize], w.offset, -1)
w.offset += nn
// shift the remaining bytes to the start of the buffer
w.buffSize = int64(copy(w.buffer, w.buffer[nn:w.buffSize])) // nolint: gosec // copy() is always going to be non-negative
return err
}
func (w *writerNext) Write(p []byte) (int, error) {
switch {
case w.closed:
return 0, storagedriver.ErrAlreadyClosed
case w.committed:
return 0, storagedriver.ErrAlreadyCommited
case w.canceled:
return 0, storagedriver.ErrAlreadyCanceled
}
var err error
var nn int
for nn < len(p) {
n := copy(w.buffer[w.buffSize:], p[nn:])
w.buffSize += int64(n) // nolint: gosec // copy() is always going to be non-negative
// NOTE(prozlach): It should be safe to bump the size of the writer
// before the data is actually committed to the backend for two reasons:
// * data is loaded into buffer, even if this attempt to upload data
// fails, the caller may retry and the data may get uploaded.
// * the writeChunk->putChunkNext methods prune the buffer only when
// the data was successfully uploaded.
// * this follows the semantics of Azure and S3 drivers
w.size += int64(n)
if w.buffSize == int64(cap(w.buffer)) {
err = w.writeChunk()
if err != nil {
break
}
}
nn += n
}
return nn, err
}
// Size returns the number of bytes written to this FileWriter.
func (w *writerNext) Size() int64 {
return w.size
}
// init restores resumable upload basing on the Offset and Session-URL metadata
// and the `tail` of the data that are stored in the temporary object.
// Due to miminum chunk size limitation for GCS, we can't upload all the bytes
// remaining in the buffer if there are fewer than minChunkSize, so they are
// stored in the temporary object created during Close() call, and then read
// back into the buffer here. The temporary object and the resumable upload coexist
// until the resumable upload is committed. The Commit operation overrides the
// temporary object.
// The buffer size is never smaller than minChunkSize and the amount if tail
// data is never higher than minChunkSize, hence there is never going to be an
// overflow/there will always be enough place in the buffer to read back the
// temporary data in this call.
func (w *writerNext) init() error {
attrs, err := w.object.Attrs(w.ctx)
if err != nil {
var gcsErr *googleapi.Error
if errors.As(err, &gcsErr) && gcsErr.Code == http.StatusNotFound {
return storagedriver.PathNotFoundError{Path: w.object.ObjectName(), DriverName: driverName}
}
return fmt.Errorf("fetching object: %w", err)
}
if attrs.ContentType != uploadSessionContentType {
return storagedriver.PathNotFoundError{Path: w.object.ObjectName(), DriverName: driverName}
}
if attrs.Metadata["Offset"] == "" {
return fmt.Errorf("`X-Goog-Meta-Offset` HTTP header has not been set for path %s", w.object.ObjectName())
}
offset, err := strconv.ParseInt(attrs.Metadata["Offset"], 10, 64)
if err != nil {
return fmt.Errorf("parsing `X-Goog-Meta-Offset` HTTP header: %w", err)
}
r, err := w.object.NewReader(w.ctx)
if err != nil {
return fmt.Errorf("parsing `X-Goog-Meta-Offset` HTTP header: %w", err)
}
defer r.Close()
for err == nil && w.buffSize < int64(len(w.buffer)) {
var n int
n, err = r.Read(w.buffer[w.buffSize:])
w.buffSize += int64(n)
}
if err != nil && err != io.EOF {
return fmt.Errorf("reading `tail` data during writer resume: %w", err)
}
w.sessionURI = attrs.Metadata["Session-URI"]
w.offset = offset
w.size = offset + w.buffSize
return nil
}