internal/gcsx/compose_object_creator.go (115 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcsx
import (
"crypto/rand"
"errors"
"fmt"
"io"
"time"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"golang.org/x/net/context"
)
// Create an objectCreator that accepts a source object and the contents that
// should be "appended" to it, storing temporary objects using the supplied
// prefix.
//
// Note that the Create method will attempt to remove any temporary junk left
// behind, but it may fail to do so. Users should arrange for garbage collection.
//
// Create guarantees to return *gcs.PreconditionError when the source object
// has been clobbered.
func newComposeObjectCreator(
prefix string,
bucket gcs.Bucket) (oc objectCreator) {
oc = &composeObjectCreator{
prefix: prefix,
bucket: bucket,
}
return
}
////////////////////////////////////////////////////////////////////////
// Implementation
////////////////////////////////////////////////////////////////////////
type composeObjectCreator struct {
prefix string
bucket gcs.Bucket
}
func (oc *composeObjectCreator) chooseName() (name string, err error) {
// Generate a good 64-bit random number.
var buf [8]byte
_, err = io.ReadFull(rand.Reader, buf[:])
if err != nil {
err = fmt.Errorf("ReadFull: %w", err)
return
}
x := uint64(buf[0])<<0 |
uint64(buf[1])<<8 |
uint64(buf[2])<<16 |
uint64(buf[3])<<24 |
uint64(buf[4])<<32 |
uint64(buf[5])<<40 |
uint64(buf[6])<<48 |
uint64(buf[7])<<56
// Turn it into a name.
name = fmt.Sprintf("%s%016x", oc.prefix, x)
return
}
// ObjectName param is present here for consistency between fullObjectCreator
// and composeObjectCreator. ObjectName is not used in append flow since
// srcObject.Name gives the objectName.
func (oc *composeObjectCreator) Create(
ctx context.Context,
objectName string,
srcObject *gcs.Object,
mtime *time.Time,
chunkTransferTimeoutSecs int64,
r io.Reader) (o *gcs.Object, err error) {
// Choose a name for a temporary object.
tmpName, err := oc.chooseName()
if err != nil {
err = fmt.Errorf("chooseName: %w", err)
return
}
// Create a temporary object containing the additional contents.
req := gcs.NewCreateObjectRequest(nil, tmpName, nil, chunkTransferTimeoutSecs)
req.Contents = r
tmp, err := oc.bucket.CreateObject(ctx, req)
if err != nil {
err = fmt.Errorf("CreateObject: %w", err)
return
}
// Attempt to delete the temporary object when we're done.
defer func() {
deleteErr := oc.bucket.DeleteObject(
ctx,
&gcs.DeleteObjectRequest{
Name: tmp.Name,
Generation: 0, // Delete the latest generation of temporary object.
})
if err == nil && deleteErr != nil {
err = fmt.Errorf("DeleteObject: %w", deleteErr)
}
}()
MetadataMap := make(map[string]string)
/* Copy Metadata fields from src object to new object generated by compose. */
for key, value := range srcObject.Metadata {
MetadataMap[key] = value
}
if mtime != nil {
MetadataMap[gcs.MtimeMetadataKey] = mtime.UTC().Format(time.RFC3339Nano)
}
// Compose the old contents plus the new over the old.
o, err = oc.bucket.ComposeObjects(
ctx,
&gcs.ComposeObjectsRequest{
DstName: srcObject.Name,
DstGenerationPrecondition: &srcObject.Generation,
DstMetaGenerationPrecondition: &srcObject.MetaGeneration,
Sources: []gcs.ComposeSource{
gcs.ComposeSource{
Name: srcObject.Name,
Generation: srcObject.Generation,
},
gcs.ComposeSource{
Name: tmp.Name,
Generation: tmp.Generation,
},
},
Metadata: MetadataMap,
CacheControl: srcObject.CacheControl,
ContentDisposition: srcObject.ContentDisposition,
ContentEncoding: srcObject.ContentEncoding,
ContentType: srcObject.ContentType,
CustomTime: srcObject.CustomTime,
EventBasedHold: srcObject.EventBasedHold,
StorageClass: srcObject.StorageClass,
})
if err != nil {
// A not found error means that either the source object was clobbered or the
// temporary object was. The latter is unlikely, so we signal a precondition
// error.
var notFoundErr *gcs.NotFoundError
if errors.As(err, ¬FoundErr) {
err = &gcs.PreconditionError{
Err: err,
}
}
err = fmt.Errorf("ComposeObjects: %w", err)
return
}
return
}