internal/gcsx/syncer.go (135 lines of code) (raw):

// Copyright 2015 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package gcsx import ( "fmt" "io" "time" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "golang.org/x/net/context" ) // Syncer is safe for concurrent access. type Syncer interface { // Given an object record and content that was originally derived from that // object's contents (and potentially modified): // // * If the temp file has not been modified, return a nil new object. // // * Otherwise, write out a new generation in the bucket (failing with // *gcs.PreconditionError if the source generation is no longer current). SyncObject( ctx context.Context, fileName string, srcObject *gcs.Object, content TempFile) (o *gcs.Object, err error) } // NewSyncer creates a syncer that syncs into the supplied bucket. // // When the source object has been changed only by appending, and the source // object's size is at least composeThreshold, we will "append" to it by writing // out a temporary blob and composing it with the source object. // // Temporary blobs have names beginning with tmpObjectPrefix. We make an effort // to delete them, but if we are interrupted for some reason we may not be able // to do so. Therefore the user should arrange for garbage collection. func NewSyncer( composeThreshold int64, chunkTransferTimeoutSecs int64, tmpObjectPrefix string, bucket gcs.Bucket) (os Syncer) { // Create the object creators. fullCreator := &fullObjectCreator{ bucket: bucket, } // Zonal buckets do not currently support Compose, so we always write objects // in their entirety. var composeCreator objectCreator if !bucket.BucketType().Zonal { composeCreator = newComposeObjectCreator( tmpObjectPrefix, bucket) } // And the syncer. os = newSyncer(composeThreshold, chunkTransferTimeoutSecs, fullCreator, composeCreator) return } //////////////////////////////////////////////////////////////////////// // fullObjectCreator //////////////////////////////////////////////////////////////////////// type fullObjectCreator struct { bucket gcs.Bucket } func (oc *fullObjectCreator) Create( ctx context.Context, objectName string, srcObject *gcs.Object, mtime *time.Time, chunkTransferTimeoutSecs int64, r io.Reader) (o *gcs.Object, err error) { req := gcs.NewCreateObjectRequest(srcObject, objectName, mtime, chunkTransferTimeoutSecs) req.Contents = r o, err = oc.bucket.CreateObject(ctx, req) if err != nil { err = fmt.Errorf("CreateObject: %w", err) return } return } //////////////////////////////////////////////////////////////////////// // syncer //////////////////////////////////////////////////////////////////////// // An implementation detail of syncer. See notes on newSyncer. type objectCreator interface { Create( ctx context.Context, objectName string, srcObject *gcs.Object, mtime *time.Time, chunkTransferTimeoutSecs int64, r io.Reader) (o *gcs.Object, err error) } // Create a syncer that stats the mutable content to see if it's dirty before // calling through to one of two object creators if the content is dirty: // // - fullCreator accepts the source object and the full contents with which it // should be overwritten. // // - composeCreator accepts the source object and the contents that should be // "appended" to it. // // composeThreshold controls the source object length at which we consider it // worthwhile to make the append optimization. It should be set to a value on // the order of the bandwidth to GCS times three times the round trip latency // to GCS (for a small create, a compose, and a delete). func newSyncer( composeThreshold int64, chunkTransferTimeoutSecs int64, fullCreator objectCreator, composeCreator objectCreator) (os Syncer) { os = &syncer{ composeThreshold: composeThreshold, chunkTransferTimeoutSecs: chunkTransferTimeoutSecs, fullCreator: fullCreator, composeCreator: composeCreator, } return } type syncer struct { composeThreshold int64 chunkTransferTimeoutSecs int64 fullCreator objectCreator composeCreator objectCreator } func (os *syncer) SyncObject( ctx context.Context, objectName string, srcObject *gcs.Object, content TempFile) (o *gcs.Object, err error) { // Stat the content. sr, err := content.Stat() if err != nil { err = fmt.Errorf("stat: %w", err) return } // Local files are not present on GCS, hence only fullCreator is // invoked and append flow is never triggered. if srcObject == nil { // Content.Stat() seeks the current position to end of file. Seek it back // to beginning of the file. _, err = content.Seek(0, 0) if err != nil { err = fmt.Errorf("error in seeking: %w", err) return } return os.fullCreator.Create(ctx, objectName, srcObject, sr.Mtime, os.chunkTransferTimeoutSecs, content) } // Make sure the dirty threshold makes sense. srcSize := int64(srcObject.Size) if sr.DirtyThreshold > srcSize { err = fmt.Errorf( "stat returned weird DirtyThreshold field: %d vs. %d", sr.DirtyThreshold, srcObject.Size) return } // If the content hasn't been dirtied (i.e. it is the same size as the source // object, and no bytes within the source object have been dirtied), we're // done. if sr.Size == srcSize && sr.DirtyThreshold == srcSize { return } // Sanity check: the branch above should ensure that by the time we get here, // the stat result's mtime is non-nil. if sr.Mtime == nil { err = fmt.Errorf("wacky stat result: %#v", sr) return } // Otherwise, we need to create a new generation. If the source object is // long enough, hasn't been dirtied, and has a low enough component count, // then we can make the optimization of not rewriting its contents. if os.composeCreator != nil && srcSize >= os.composeThreshold && sr.DirtyThreshold == srcSize && srcObject.ComponentCount < gcs.MaxComponentCount { _, err = content.Seek(srcSize, 0) if err != nil { err = fmt.Errorf("seek: %w", err) return } o, err = os.composeCreator.Create(ctx, objectName, srcObject, sr.Mtime, os.chunkTransferTimeoutSecs, content) } else { _, err = content.Seek(0, 0) if err != nil { err = fmt.Errorf("seek: %w", err) return } o, err = os.fullCreator.Create(ctx, objectName, srcObject, sr.Mtime, os.chunkTransferTimeoutSecs, content) } // Deal with errors. if err != nil { err = fmt.Errorf("create: %w", err) return } return }