internal/storage/bucket_handle.go (442 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// For now, we are not writing the unit test, which requires multiple
// version of same object. As this is not supported by fake-storage-server.
// Although API is exposed to enable the object versioning for a bucket,
// but it returns "method not allowed" when we call it.
package storage
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/control/apiv2/controlpb"
"github.com/googleapis/gax-go/v2"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"google.golang.org/api/iterator"
)
const FullFolderPathHNS = "projects/_/buckets/%s/folders/%s"
const FullBucketPathHNS = "projects/_/buckets/%s"
type bucketHandle struct {
gcs.Bucket
bucket *storage.BucketHandle
bucketName string
bucketType *gcs.BucketType
controlClient StorageControlClient
}
func (bh *bucketHandle) Name() string {
return bh.bucketName
}
func (bh *bucketHandle) BucketType() gcs.BucketType {
return *bh.bucketType
}
func (bh *bucketHandle) NewReaderWithReadHandle(
ctx context.Context,
req *gcs.ReadObjectRequest) (reader gcs.StorageReader, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
// Initialising the starting offset and the length to be read by the reader.
start := int64(0)
length := int64(-1)
// Following the semantics of NewRangeReader method.
// If length is negative, the object is read until the end.
// If offset is negative, the object is read abs(offset) bytes from the end,
// and length must also be negative to indicate all remaining bytes will be read.
// Ref: https://github.com/GoogleCloudPlatform/gcsfuse/blob/34211af652dbaeb012b381a3daf3c94b95f65e00/vendor/cloud.google.com/go/storage/reader.go#L80
if req.Range != nil {
start = int64((*req.Range).Start)
end := int64((*req.Range).Limit)
length = end - start
}
obj := bh.bucket.Object(req.Name)
// Switching to the requested generation of object.
if req.Generation != 0 {
obj = obj.Generation(req.Generation)
}
if req.ReadCompressed {
obj = obj.ReadCompressed(true)
}
// Insert ReadHandle into objectHandle if present.
// Objects that have been opened can be opened again using readHandle at lower latency.
// This produces the exact same object and generation and does not check if
// the generation is still the newest one.
if req.ReadHandle != nil {
obj = obj.ReadHandle(req.ReadHandle)
}
// NewRangeReader creates a "storage.Reader" object which is also io.ReadCloser since it contains both Read() and Close() methods present in io.ReadCloser interface.
reader, err = obj.NewRangeReader(ctx, start, length)
return
}
func (bh *bucketHandle) DeleteObject(ctx context.Context, req *gcs.DeleteObjectRequest) (err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
obj := bh.bucket.Object(req.Name)
// Switching to the requested generation of the object. By default, generation
// is 0 which signifies the latest generation. Note: GCS will delete the
// live object even if generation is not set in request. We are passing 0
// generation explicitly to satisfy idempotency condition.
obj = obj.Generation(req.Generation)
// Putting condition that the object's MetaGeneration should match the requested MetaGeneration for deletion to occur.
if req.MetaGenerationPrecondition != nil && *req.MetaGenerationPrecondition != 0 {
obj = obj.If(storage.Conditions{MetagenerationMatch: *req.MetaGenerationPrecondition})
}
err = obj.Delete(ctx)
if err != nil {
err = fmt.Errorf("error in deleting object: %w", err)
}
return
}
func (bh *bucketHandle) StatObject(ctx context.Context,
req *gcs.StatObjectRequest) (m *gcs.MinObject, e *gcs.ExtendedObjectAttributes, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
var attrs *storage.ObjectAttrs
// Retrieving object attrs through Go Storage Client.
attrs, err = bh.bucket.Object(req.Name).Attrs(ctx)
if err != nil {
err = fmt.Errorf("error in fetching object attributes: %w", err)
return
}
// Converting attrs to type *Object
o := storageutil.ObjectAttrsToBucketObject(attrs)
m = storageutil.ConvertObjToMinObject(o)
if req.ReturnExtendedObjectAttributes {
e = storageutil.ConvertObjToExtendedObjectAttributes(o)
}
return
}
func (bh *bucketHandle) getObjectHandleWithPreconditionsSet(req *gcs.CreateObjectRequest) *storage.ObjectHandle {
obj := bh.bucket.Object(req.Name)
// GenerationPrecondition - If non-nil, the object will be created/overwritten
// only if the current generation for the object name is equal to the given value.
// Zero means the object does not exist.
// MetaGenerationPrecondition - If non-nil, the object will be created/overwritten
// only if the current metaGeneration for the object name is equal to the given value.
// Zero means the object does not exist.
preconditions := storage.Conditions{}
if req.GenerationPrecondition != nil {
if *req.GenerationPrecondition == 0 {
preconditions.DoesNotExist = true
} else {
preconditions.GenerationMatch = *req.GenerationPrecondition
}
}
if req.MetaGenerationPrecondition != nil && *req.MetaGenerationPrecondition != 0 {
preconditions.MetagenerationMatch = *req.MetaGenerationPrecondition
}
// Setting up the conditions on the object if it's not empty i.e, atleast
// if one of the condition is set.
if isStorageConditionsNotEmpty(preconditions) {
obj = obj.If(preconditions)
}
return obj
}
func (bh *bucketHandle) CreateObject(ctx context.Context, req *gcs.CreateObjectRequest) (o *gcs.Object, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
obj := bh.getObjectHandleWithPreconditionsSet(req)
// Creating a NewWriter with requested attributes, using Go Storage Client.
// Chuck size for resumable upload is default i.e. 16MB.
wc := obj.NewWriter(ctx)
wc.ChunkTransferTimeout = time.Duration(req.ChunkTransferTimeoutSecs) * time.Second
wc = storageutil.SetAttrsInWriter(wc, req)
wc.ProgressFunc = func(bytesUploadedSoFar int64) {
logger.Tracef("gcs: Req %#16x: -- CreateObject(%q): %20v bytes uploaded so far", ctx.Value(gcs.ReqIdField), req.Name, bytesUploadedSoFar)
}
// All objects in zonal buckets must be appendable.
wc.Append = bh.BucketType().Zonal
// FinalizeOnClose should be true for all writes for now.
wc.FinalizeOnClose = true
// Copy the contents to the writer.
if _, err = io.Copy(wc, req.Contents); err != nil {
err = fmt.Errorf("error in io.Copy: %w", err)
return
}
// We can't use defer to close the writer, because we need to close the
// writer successfully before calling Attrs() method of writer.
if err = wc.Close(); err != nil {
err = fmt.Errorf("error in closing writer : %w", err)
return
}
attrs := wc.Attrs() // Retrieving the attributes of the created object.
// Converting attrs to type *Object.
o = storageutil.ObjectAttrsToBucketObject(attrs)
return
}
func (bh *bucketHandle) CreateObjectChunkWriter(ctx context.Context, req *gcs.CreateObjectRequest, chunkSize int, callBack func(bytesUploadedSoFar int64)) (gcs.Writer, error) {
obj := bh.getObjectHandleWithPreconditionsSet(req)
wc := &ObjectWriter{obj.NewWriter(ctx)}
wc.ChunkSize = chunkSize
wc.Writer = storageutil.SetAttrsInWriter(wc.Writer, req)
if callBack == nil {
callBack = func(bytesUploadedSoFar int64) {
logger.Tracef("gcs: Req %#16x: -- UploadBlock(%q): %20v bytes uploaded so far", ctx.Value(gcs.ReqIdField), req.Name, bytesUploadedSoFar)
}
}
wc.ProgressFunc = callBack
// All objects in zonal buckets must be appendable.
wc.Append = bh.BucketType().Zonal
// FinalizeOnClose should be true for all writes for now.
wc.FinalizeOnClose = true
return wc, nil
}
func (bh *bucketHandle) FinalizeUpload(ctx context.Context, w gcs.Writer) (o *gcs.MinObject, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
if err = w.Close(); err != nil {
err = fmt.Errorf("error in closing writer : %w", err)
return
}
attrs := w.Attrs() // Retrieving the attributes of the created object.
// Converting attrs to type *MinObject.
o = storageutil.ObjectAttrsToMinObject(attrs)
return
}
func (bh *bucketHandle) FlushPendingWrites(ctx context.Context, w gcs.Writer) (o *gcs.MinObject, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
_, err = w.Flush()
if err != nil {
err = fmt.Errorf("error in FlushPendingWrites : %w", err)
return
}
attrs := w.Attrs() // Retrieving the attributes of the created object.
// Converting attrs to type *MinObject.
o = storageutil.ObjectAttrsToMinObject(attrs)
if o == nil {
return nil, fmt.Errorf("FlushPendingWrites: nil object returned after w.Flush()")
}
return
}
func (bh *bucketHandle) CopyObject(ctx context.Context, req *gcs.CopyObjectRequest) (o *gcs.Object, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
srcObj := bh.bucket.Object(req.SrcName)
dstObj := bh.bucket.Object(req.DstName)
// Switching to the requested generation of source object.
if req.SrcGeneration != 0 {
srcObj = srcObj.Generation(req.SrcGeneration)
}
// Putting a condition that the metaGeneration of source should match *req.SrcMetaGenerationPrecondition for copy operation to occur.
if req.SrcMetaGenerationPrecondition != nil {
srcObj = srcObj.If(storage.Conditions{MetagenerationMatch: *req.SrcMetaGenerationPrecondition})
}
objAttrs, err := dstObj.CopierFrom(srcObj).Run(ctx)
if err != nil {
err = fmt.Errorf("error in copying object: %w", err)
return
}
// Converting objAttrs to type *Object
o = storageutil.ObjectAttrsToBucketObject(objAttrs)
return
}
func getProjectionValue(req gcs.Projection) storage.Projection {
// Explicitly converting Projection Value because the ProjectionVal interface of jacobsa/gcloud and Go Client API are not coupled correctly.
var convertedProjection storage.Projection // Stores the Projection Value according to the Go Client API Interface.
switch int(req) {
// Projection Value 0 in jacobsa/gcloud maps to Projection Value 1 in Go Client API, that is for "full".
case 0:
convertedProjection = storage.Projection(1)
// Projection Value 1 in jacobsa/gcloud maps to Projection Value 2 in Go Client API, that is for "noAcl".
case 1:
convertedProjection = storage.Projection(2)
// Default Projection value in jacobsa/gcloud library is 0 that maps to 1 in Go Client API interface, and that is for "full".
default:
convertedProjection = storage.Projection(1)
}
return convertedProjection
}
func (bh *bucketHandle) ListObjects(ctx context.Context, req *gcs.ListObjectsRequest) (listing *gcs.Listing, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
// Converting *ListObjectsRequest to type *storage.Query as expected by the Go Storage Client.
query := &storage.Query{
Delimiter: req.Delimiter,
Prefix: req.Prefix,
Projection: getProjectionValue(req.ProjectionVal),
IncludeTrailingDelimiter: req.IncludeTrailingDelimiter,
IncludeFoldersAsPrefixes: req.IncludeFoldersAsPrefixes,
//MaxResults: , (Field not present in storage.Query of Go Storage Library but present in ListObjectsQuery in Jacobsa code.)
}
err = query.SetAttrSelection([]string{"Name", "Size", "Generation", "Metageneration", "Updated", "Metadata", "ContentEncoding", "CRC32C"})
if err != nil {
err = fmt.Errorf("error while setting attribute selection for List Object query :%w", err)
return
}
itr := bh.bucket.Objects(ctx, query) // Returning iterator to the list of objects.
pi := itr.PageInfo()
pi.MaxSize = req.MaxResults
pi.Token = req.ContinuationToken
var list gcs.Listing
// Iterating through all the objects in the bucket and one by one adding them to the list.
for {
var attrs *storage.ObjectAttrs
attrs, err = itr.Next()
if err == iterator.Done {
err = nil
break
}
if err != nil {
err = fmt.Errorf("error in iterating through objects: %w", err)
return
}
// Prefix attribute will be set for the objects returned as part of Prefix[] array in list response.
// https://github.com/GoogleCloudPlatform/gcsfuse/blob/master/vendor/cloud.google.com/go/storage/storage.go#L1304
// https://github.com/GoogleCloudPlatform/gcsfuse/blob/master/vendor/cloud.google.com/go/storage/http_client.go#L370
if attrs.Prefix != "" {
list.CollapsedRuns = append(list.CollapsedRuns, attrs.Prefix)
} else {
// Converting attrs to *Object type.
currMinObject := storageutil.ObjectAttrsToMinObject(attrs)
list.MinObjects = append(list.MinObjects, currMinObject)
}
// itr.next returns all the objects present in the bucket. Hence adding a
// check to break after iterating over the current page. pi.Remaining()
// function returns number of items (items + prefixes) remaining in current
// page to be iterated by iterator (itr). The func returns (number of items in current page - 1)
// after first itr.Next() call and becomes 0 when iteration is done.
// If req.MaxResults is 0, then wait till iterator is done. This is similar
// to https://github.com/GoogleCloudPlatform/gcsfuse/blob/master/vendor/github.com/jacobsa/gcloud/gcs/bucket.go#L164
if req.MaxResults != 0 && (pi.Remaining() == 0) {
break
}
}
list.ContinuationToken = itr.PageInfo().Token
listing = &list
return
}
func (bh *bucketHandle) UpdateObject(ctx context.Context, req *gcs.UpdateObjectRequest) (o *gcs.Object, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
obj := bh.bucket.Object(req.Name)
if req.Generation != 0 {
obj = obj.Generation(req.Generation)
}
if req.MetaGenerationPrecondition != nil {
obj = obj.If(storage.Conditions{MetagenerationMatch: *req.MetaGenerationPrecondition})
}
updateQuery := storage.ObjectAttrsToUpdate{}
if req.ContentType != nil {
updateQuery.ContentType = *req.ContentType
}
if req.ContentEncoding != nil {
updateQuery.ContentEncoding = *req.ContentEncoding
}
if req.ContentLanguage != nil {
updateQuery.ContentLanguage = *req.ContentLanguage
}
if req.CacheControl != nil {
updateQuery.CacheControl = *req.CacheControl
}
if req.Metadata != nil {
updateQuery.Metadata = make(map[string]string)
for key, element := range req.Metadata {
if element != nil {
updateQuery.Metadata[key] = *element
}
}
}
attrs, err := obj.Update(ctx, updateQuery)
if err != nil {
err = fmt.Errorf("error in updating object: %w", err)
return
}
// Converting objAttrs to type *Object
o = storageutil.ObjectAttrsToBucketObject(attrs)
return
}
func (bh *bucketHandle) ComposeObjects(ctx context.Context, req *gcs.ComposeObjectsRequest) (o *gcs.Object, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
dstObj := bh.bucket.Object(req.DstName)
dstObjConds := storage.Conditions{}
if req.DstMetaGenerationPrecondition != nil {
dstObjConds.MetagenerationMatch = *req.DstMetaGenerationPrecondition
}
// DstGenerationPrecondition or DoesNotExist should be set in dstObj
// preconditions to make requests Idempotent.
// https://github.com/GoogleCloudPlatform/gcsfuse/blob/7ad451c6f2ead7992e030503e5b66c555b2ebf71/vendor/cloud.google.com/go/storage/copy.go#L230
if req.DstGenerationPrecondition != nil {
if *req.DstGenerationPrecondition == 0 {
dstObjConds.DoesNotExist = true
} else {
dstObjConds.GenerationMatch = *req.DstGenerationPrecondition
}
}
// Only set conditions on dstObj if there is at least one condition in
// dstObjConds. Otherwise, storage client library gives empty conditions error.
// https://github.com/GoogleCloudPlatform/gcsfuse/blob/7ad451c6f2ead7992e030503e5b66c555b2ebf71/vendor/cloud.google.com/go/storage/storage.go#L1739
if isStorageConditionsNotEmpty(dstObjConds) {
dstObj = dstObj.If(dstObjConds)
}
// Converting the req.Sources list to a list of storage.ObjectHandle as expected by the Go Storage Client.
var srcObjList []*storage.ObjectHandle
for _, src := range req.Sources {
currSrcObj := bh.bucket.Object(src.Name)
// Switching to requested Generation of the object.
// Zero src generation is the latest generation, we are skipping it because by default it will take the latest one
if src.Generation != 0 {
currSrcObj = currSrcObj.Generation(src.Generation)
}
srcObjList = append(srcObjList, currSrcObj)
}
// Composing Source Objects to Destination Object using Composer created through Go Storage Client.
attrs, err := dstObj.ComposerFrom(srcObjList...).Run(ctx)
if err != nil {
err = fmt.Errorf("error in composing object: %w", err)
return
}
// Converting attrs to type *Object.
o = storageutil.ObjectAttrsToBucketObject(attrs)
return
}
func (bh *bucketHandle) DeleteFolder(ctx context.Context, folderName string) (err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
var callOptions []gax.CallOption
err = bh.controlClient.DeleteFolder(ctx, &controlpb.DeleteFolderRequest{
Name: fmt.Sprintf(FullFolderPathHNS, bh.bucketName, folderName),
}, callOptions...)
return
}
func (bh *bucketHandle) MoveObject(ctx context.Context, req *gcs.MoveObjectRequest) (o *gcs.Object, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
obj := bh.bucket.Object(req.SrcName)
// Switching to the requested generation of source object.
if req.SrcGeneration != 0 {
obj = obj.Generation(req.SrcGeneration)
}
// Putting a condition that the metaGeneration of source should match *req.SrcMetaGenerationPrecondition for move operation to occur.
if req.SrcMetaGenerationPrecondition != nil {
obj = obj.If(storage.Conditions{MetagenerationMatch: *req.SrcMetaGenerationPrecondition})
}
dstMoveObject := storage.MoveObjectDestination{
Object: req.DstName,
Conditions: nil,
}
attrs, err := obj.Move(ctx, dstMoveObject)
if err != nil {
err = fmt.Errorf("error in moving object: %w", err)
return
}
// Converting objAttrs to type *Object
o = storageutil.ObjectAttrsToBucketObject(attrs)
return
}
func (bh *bucketHandle) RenameFolder(ctx context.Context, folderName string, destinationFolderId string) (folder *gcs.Folder, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
var controlFolder *controlpb.Folder
req := &controlpb.RenameFolderRequest{
Name: fmt.Sprintf(FullFolderPathHNS, bh.bucketName, folderName),
DestinationFolderId: destinationFolderId,
}
resp, err := bh.controlClient.RenameFolder(ctx, req)
if err != nil {
err = fmt.Errorf("error in renaming folder: %w", err)
return
}
// Wait blocks until the long-running operation is completed,
// returning the response and any errors encountered.
controlFolder, err = resp.Wait(ctx)
if err != nil {
err = fmt.Errorf("error in getting result from renaming folder response: %w", err)
return
}
folder = gcs.GCSFolder(bh.bucketName, controlFolder)
return
}
// TODO: Consider adding this method to the bucket interface if additional
// layout options are needed in the future.
func (bh *bucketHandle) getStorageLayout() (*controlpb.StorageLayout, error) {
var callOptions []gax.CallOption
stoargeLayout, err := bh.controlClient.GetStorageLayout(context.Background(), &controlpb.GetStorageLayoutRequest{
Name: fmt.Sprintf("projects/_/buckets/%s/storageLayout", bh.bucketName),
Prefix: "",
RequestId: "",
}, callOptions...)
return stoargeLayout, err
}
func (bh *bucketHandle) GetFolder(ctx context.Context, folderName string) (folder *gcs.Folder, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
var callOptions []gax.CallOption
var clientFolder *controlpb.Folder
clientFolder, err = bh.controlClient.GetFolder(ctx, &controlpb.GetFolderRequest{
Name: fmt.Sprintf(FullFolderPathHNS, bh.bucketName, folderName),
}, callOptions...)
if err != nil {
err = fmt.Errorf("error getting metadata for folder: %s, %w", folderName, err)
return
}
folder = gcs.GCSFolder(bh.bucketName, clientFolder)
return
}
func (bh *bucketHandle) CreateFolder(ctx context.Context, folderName string) (folder *gcs.Folder, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
req := &controlpb.CreateFolderRequest{
Parent: fmt.Sprintf(FullBucketPathHNS, bh.bucketName),
FolderId: folderName,
Recursive: true,
}
clientFolder, err := bh.controlClient.CreateFolder(ctx, req)
if err != nil {
err = fmt.Errorf("error in creating folder: %w", err)
return
}
folder = gcs.GCSFolder(bh.bucketName, clientFolder)
return
}
func (bh *bucketHandle) NewMultiRangeDownloader(
ctx context.Context, req *gcs.MultiRangeDownloaderRequest) (mrd gcs.MultiRangeDownloader, err error) {
defer func() {
err = gcs.GetGCSError(err)
}()
obj := bh.bucket.Object(req.Name)
// Switching to the requested generation of object.
if req.Generation != 0 {
obj = obj.Generation(req.Generation)
}
if req.ReadCompressed {
obj = obj.ReadCompressed(true)
}
mrd, err = obj.NewMultiRangeDownloader(ctx)
return
}
func isStorageConditionsNotEmpty(conditions storage.Conditions) bool {
return conditions != (storage.Conditions{})
}