internal/gcsx/multi_range_downloader_wrapper.go (178 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package gcsx import ( "bytes" "fmt" "io" "sync" "time" "github.com/google/uuid" "github.com/googlecloudplatform/gcsfuse/v2/common" "github.com/googlecloudplatform/gcsfuse/v2/internal/clock" "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" "github.com/googlecloudplatform/gcsfuse/v2/internal/monitor" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "golang.org/x/net/context" ) // Timeout value which determines when the MultiRangeDownloader will be closed after // it's refcount reaches 0. const multiRangeDownloaderTimeout = 60 * time.Second func NewMultiRangeDownloaderWrapper(bucket gcs.Bucket, object *gcs.MinObject) (MultiRangeDownloaderWrapper, error) { return NewMultiRangeDownloaderWrapperWithClock(bucket, object, clock.RealClock{}) } func NewMultiRangeDownloaderWrapperWithClock(bucket gcs.Bucket, object *gcs.MinObject, clock clock.Clock) (MultiRangeDownloaderWrapper, error) { if object == nil { return MultiRangeDownloaderWrapper{}, fmt.Errorf("NewMultiRangeDownloaderWrapperWithClock: Missing MinObject") } // In case of a local inode, MRDWrapper would be created with an empty minObject (i.e. with a minObject without any information) // and when the object is actually created, MRDWrapper would be updated using SetMinObject method. return MultiRangeDownloaderWrapper{ clock: clock, bucket: bucket, object: object, }, nil } type readResult struct { bytesRead int err error } type MultiRangeDownloaderWrapper struct { // Holds the object implementing MultiRangeDownloader interface. Wrapped gcs.MultiRangeDownloader // Bucket and object details for MultiRangeDownloader. // Object should not be nil. object *gcs.MinObject bucket gcs.Bucket // Refcount is used to determine when to close the MultiRangeDownloader. refCount int // Mutex is used to synchronize access over refCount. mu sync.Mutex // Holds the cancel function, which can be called to cancel the cleanup function. cancelCleanup context.CancelFunc // Used for waiting for timeout (helps us in mocking the functionality). clock clock.Clock } // Sets the gcs.MinObject stored in the wrapper to passed value, only if it's non nil. func (mrdWrapper *MultiRangeDownloaderWrapper) SetMinObject(minObj *gcs.MinObject) error { if minObj == nil { return fmt.Errorf("MultiRangeDownloaderWrapper::SetMinObject: Missing MinObject") } mrdWrapper.object = minObj return nil } // Returns the minObject stored in MultiRangeDownloaderWrapper. Used only for unit testing. func (mrdWrapper *MultiRangeDownloaderWrapper) GetMinObject() *gcs.MinObject { return mrdWrapper.object } // Returns current refcount. func (mrdWrapper *MultiRangeDownloaderWrapper) GetRefCount() int { mrdWrapper.mu.Lock() defer mrdWrapper.mu.Unlock() return mrdWrapper.refCount } // Increment the refcount and cancel any running cleanup function. // This method should be called exactly once per user of this wrapper. // It has to be called before using the MultiRangeDownloader. func (mrdWrapper *MultiRangeDownloaderWrapper) IncrementRefCount() { mrdWrapper.mu.Lock() defer mrdWrapper.mu.Unlock() mrdWrapper.refCount++ if mrdWrapper.cancelCleanup != nil { mrdWrapper.cancelCleanup() mrdWrapper.cancelCleanup = nil } } // Decrement the refcount. In case refcount reaches 0, cleanup the MRD. // Returns error on invalid usage. // This method should be called exactly once per user of this wrapper // when MultiRangeDownloader is no longer needed & can be cleaned up. func (mrdWrapper *MultiRangeDownloaderWrapper) DecrementRefCount() (err error) { mrdWrapper.mu.Lock() defer mrdWrapper.mu.Unlock() if mrdWrapper.refCount <= 0 { err = fmt.Errorf("MultiRangeDownloaderWrapper DecrementRefCount: Refcount cannot be negative") return } mrdWrapper.refCount-- if mrdWrapper.refCount == 0 && mrdWrapper.Wrapped != nil { mrdWrapper.Wrapped.Close() mrdWrapper.Wrapped = nil // TODO (b/391508479): Start using cleanup function when MRD recreation is handled // mrdWrapper.cleanupMultiRangeDownloader() } return } // Spawns a cancellable go routine to close the MRD after the timeout. // Always call after taking MultiRangeDownloaderWrapper's mutex lock. func (mrdWrapper *MultiRangeDownloaderWrapper) cleanupMultiRangeDownloader() { closeMRD := func(ctx context.Context) { select { case <-mrdWrapper.clock.After(multiRangeDownloaderTimeout): mrdWrapper.mu.Lock() defer mrdWrapper.mu.Unlock() if mrdWrapper.refCount == 0 && mrdWrapper.Wrapped != nil { mrdWrapper.Wrapped.Close() mrdWrapper.Wrapped = nil mrdWrapper.cancelCleanup = nil } case <-ctx.Done(): return } } ctx, cancel := context.WithCancel(context.Background()) mrdWrapper.cancelCleanup = cancel go closeMRD(ctx) } // Ensures that MultiRangeDownloader exists, creating it if it does not exist. func (mrdWrapper *MultiRangeDownloaderWrapper) ensureMultiRangeDownloader() (err error) { if mrdWrapper.object == nil || mrdWrapper.bucket == nil { return fmt.Errorf("ensureMultiRangeDownloader error: Missing minObject or bucket") } // Create the MRD if it does not exist. // In case the existing MRD is unusable due to closed stream, recreate the MRD. if mrdWrapper.Wrapped == nil || mrdWrapper.Wrapped.Error() != nil { mrdWrapper.mu.Lock() defer mrdWrapper.mu.Unlock() // Checking if the mrdWrapper state is same after taking the lock. if mrdWrapper.Wrapped == nil || mrdWrapper.Wrapped.Error() != nil { var mrd gcs.MultiRangeDownloader mrd, err = mrdWrapper.bucket.NewMultiRangeDownloader(context.Background(), &gcs.MultiRangeDownloaderRequest{ Name: mrdWrapper.object.Name, Generation: mrdWrapper.object.Generation, ReadCompressed: mrdWrapper.object.HasContentEncodingGzip(), }) if err == nil { // Updating mrdWrapper.Wrapped only when MRD creation was successful. mrdWrapper.Wrapped = mrd } } } return } // Reads the data using MultiRangeDownloader. func (mrdWrapper *MultiRangeDownloaderWrapper) Read(ctx context.Context, buf []byte, startOffset int64, endOffset int64, timeout time.Duration, metricHandle common.MetricHandle) (bytesRead int, err error) { // Bidi Api with 0 as read_limit means no limit whereas we do not want to read anything with empty buffer. // Hence, handling it separately. if len(buf) == 0 { return 0, nil } err = mrdWrapper.ensureMultiRangeDownloader() if err != nil { err = fmt.Errorf("MultiRangeDownloaderWrapper::Read: Error in creating MultiRangeDownloader: %v", err) return } // We will only read what is requested by the client. Hence, capping end to the requested value. if endOffset > startOffset+int64(len(buf)) { endOffset = startOffset + int64(len(buf)) } buffer := bytes.NewBuffer(buf) buffer.Reset() done := make(chan readResult, 1) mu := sync.Mutex{} defer func() { mu.Lock() close(done) done = nil mu.Unlock() }() requestId := uuid.New() logger.Tracef("%.13v <- MultiRangeDownloader::Add (%s, [%d, %d))", requestId, mrdWrapper.object.Name, startOffset, endOffset) start := time.Now() mrdWrapper.Wrapped.Add(buffer, startOffset, endOffset-startOffset, func(offsetAddCallback int64, bytesReadAddCallback int64, e error) { defer func() { mu.Lock() if done != nil { done <- readResult{bytesRead: int(bytesReadAddCallback), err: e} } mu.Unlock() }() if e != nil && e != io.EOF { e = fmt.Errorf("Error in Add Call: %w", e) } }) select { case <-time.After(timeout): err = fmt.Errorf("Timeout") case <-ctx.Done(): err = fmt.Errorf("Context Cancelled: %w", ctx.Err()) case res := <-done: bytesRead = res.bytesRead err = res.err } duration := time.Since(start) monitor.CaptureMultiRangeDownloaderMetrics(ctx, metricHandle, "MultiRangeDownloader::Add", start) errDesc := "OK" if err != nil { errDesc = err.Error() err = fmt.Errorf("MultiRangeDownloaderWrapper::Read: %w", err) logger.Errorf("%v", err) } logger.Tracef("%.13v -> MultiRangeDownloader::Add (%s, [%d, %d)) (%v): %v", requestId, mrdWrapper.object.Name, startOffset, endOffset, duration, errDesc) return }