internal/fs/handle/file.go (114 lines of code) (raw):

// Copyright 2015 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package handle import ( "errors" "fmt" "io" "github.com/googlecloudplatform/gcsfuse/v2/common" "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/file" "github.com/googlecloudplatform/gcsfuse/v2/internal/fs/inode" "github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx" "github.com/jacobsa/syncutil" "golang.org/x/net/context" ) type FileHandle struct { inode *inode.FileInode mu syncutil.InvariantMutex // A random reader configured to some (potentially previous) generation of // the object backing the inode, or nil. // // INVARIANT: If reader != nil, reader.CheckInvariants() doesn't panic. // // GUARDED_BY(mu) reader gcsx.RandomReader // fileCacheHandler is used to get file cache handle and read happens using that. // This will be nil if the file cache is disabled. fileCacheHandler *file.CacheHandler // cacheFileForRangeRead is also valid for cache workflow, if true, object content // will be downloaded for random reads as well too. cacheFileForRangeRead bool metricHandle common.MetricHandle // For now, we will consider the files which are open in append mode also as write, // as we are not doing anything special for append. When required we will // define an enum instead of boolean to hold the type of open. readOnly bool } // LOCKS_REQUIRED(fh.inode.mu) func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle common.MetricHandle, readOnly bool) (fh *FileHandle) { fh = &FileHandle{ inode: inode, fileCacheHandler: fileCacheHandler, cacheFileForRangeRead: cacheFileForRangeRead, metricHandle: metricHandle, readOnly: readOnly, } fh.inode.RegisterFileHandle(fh.readOnly) fh.mu = syncutil.NewInvariantMutex(fh.checkInvariants) return } // Destroy any resources associated with the handle, which must not be used // again. // LOCKS_REQUIRED(fh.mu) // LOCK_FUNCTION(fh.inode.mu) // UNLOCK_FUNCTION(fh.inode.mu) func (fh *FileHandle) Destroy() { // Deregister the fileHandle with the inode. fh.inode.Lock() fh.inode.DeRegisterFileHandle(fh.readOnly) fh.inode.Unlock() if fh.reader != nil { fh.reader.Destroy() } } // Inode returns the inode backing this handle. func (fh *FileHandle) Inode() *inode.FileInode { return fh.inode } func (fh *FileHandle) Lock() { fh.mu.Lock() } func (fh *FileHandle) Unlock() { fh.mu.Unlock() } // Equivalent to locking fh.Inode() and calling fh.Inode().Read, but may be // more efficient. // // LOCKS_REQUIRED(fh) // LOCKS_EXCLUDED(fh.inode) func (fh *FileHandle) Read(ctx context.Context, dst []byte, offset int64, sequentialReadSizeMb int32) (output []byte, n int, err error) { // Lock the inode and attempt to ensure that we have a reader for its current // state, or clear fh.reader if it's not possible to create one (probably // because the inode is dirty). fh.inode.Lock() // Ensure all pending writes to Zonal Buckets are flushed before issuing a read. // Updating inode state is not required here because inode state for Zonal Buckets will // be updated at time of BWH creation. _, err = fh.inode.SyncPendingBufferedWrites() if err != nil { fh.inode.Unlock() err = fmt.Errorf("fh.inode.SyncPendingBufferedWrites: %w", err) return } err = fh.tryEnsureReader(ctx, sequentialReadSizeMb) if err != nil { fh.inode.Unlock() err = fmt.Errorf("tryEnsureReader: %w", err) return } // If we have an appropriate reader, unlock the inode and use that. This // allows reads to proceed concurrently with other operations; in particular, // multiple reads can run concurrently. It's safe because the user can't tell // if a concurrent write started during or after a read. if fh.reader != nil { fh.inode.Unlock() var objectData gcsx.ObjectData objectData, err = fh.reader.ReadAt(ctx, dst, offset) switch { case errors.Is(err, io.EOF): err = io.EOF return case err != nil: err = fmt.Errorf("fh.reader.ReadAt: %w", err) return } output = objectData.DataBuf n = objectData.Size return } // Otherwise we must fall through to the inode. defer fh.inode.Unlock() n, err = fh.inode.Read(ctx, dst, offset) // Setting dst as output since output is used by the caller to read the data. output = dst return } //////////////////////////////////////////////////////////////////////// // Helpers //////////////////////////////////////////////////////////////////////// // LOCKS_REQUIRED(fh.mu) func (fh *FileHandle) checkInvariants() { // INVARIANT: If reader != nil, reader.CheckInvariants() doesn't panic. if fh.reader != nil { fh.reader.CheckInvariants() } } // If possible, ensure that fh.reader is set to an appropriate random reader // for the current state of the inode. Otherwise set it to nil. // // LOCKS_REQUIRED(fh) // LOCKS_REQUIRED(fh.inode) func (fh *FileHandle) tryEnsureReader(ctx context.Context, sequentialReadSizeMb int32) (err error) { // If content cache enabled, CacheEnsureContent forces the file handler to fall through to the inode // and fh.inode.SourceGenerationIsAuthoritative() will return false err = fh.inode.CacheEnsureContent(ctx) if err != nil { return } // If the inode is dirty, there's nothing we can do. Throw away our reader if // we have one. if !fh.inode.SourceGenerationIsAuthoritative() { if fh.reader != nil { fh.reader.Destroy() fh.reader = nil } return } // If we already have a reader, and it's at the appropriate generation, we // can use it. Otherwise we must throw it away. if fh.reader != nil { if fh.reader.Object().Generation == fh.inode.SourceGeneration().Object { // Update reader object size to source object size. fh.reader.Object().Size = fh.inode.SourceGeneration().Size return } fh.reader.Destroy() fh.reader = nil } // Attempt to create an appropriate reader. rr := gcsx.NewRandomReader(fh.inode.Source(), fh.inode.Bucket(), sequentialReadSizeMb, fh.fileCacheHandler, fh.cacheFileForRangeRead, fh.metricHandle, &fh.inode.MRDWrapper) fh.reader = rr return }