internal/fs/fs.go (1,659 lines of code) (raw):

// Copyright 2015 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package fs import ( "context" "errors" "fmt" "io" iofs "io/fs" "math" "os" "path" "reflect" "strings" "syscall" "time" "golang.org/x/sync/semaphore" "github.com/googlecloudplatform/gcsfuse/v2/cfg" "github.com/googlecloudplatform/gcsfuse/v2/common" "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/file" "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/file/downloader" "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru" cacheutil "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util" "github.com/googlecloudplatform/gcsfuse/v2/internal/contentcache" "github.com/googlecloudplatform/gcsfuse/v2/internal/fs/handle" "github.com/googlecloudplatform/gcsfuse/v2/internal/fs/inode" "github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx" "github.com/googlecloudplatform/gcsfuse/v2/internal/locker" "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v2/internal/util" "github.com/jacobsa/fuse" "github.com/jacobsa/fuse/fuseops" "github.com/jacobsa/fuse/fuseutil" "github.com/jacobsa/timeutil" ) type ServerConfig struct { // A clock used for cache expiration. It is *not* used for inode times, for // which we use the wall clock. CacheClock timeutil.Clock // The bucket manager is responsible for setting up buckets. BucketManager gcsx.BucketManager // The name of the specific GCS bucket to be mounted. If it's empty or "_", // all accessible GCS buckets are mounted as subdirectories of the FS root. BucketName string // LocalFileCache LocalFileCache bool // The temporary directory to use for local caching, or the empty string to // use the system default. TempDir string // By default, if a bucket contains the object "foo/bar" but no object named // "foo/", it's as if the directory doesn't exist. This allows us to have // non-flaky name resolution code. // // Setting this bool to true enables a mode where object listings are // consulted to allow for the directory in the situation above to exist. Note // that this has drawbacks in the form of name resolution flakiness and // surprising behavior. // // See docs/semantics.md for more info. ImplicitDirectories bool // By default, if a file/directory does not exist in GCS, this nonexistent state is // not cached in type cache. So the inode lookup request will hit GCS every // time. // // Setting this bool to true enables the nonexistent type cache so if the // inode state is NonexistentType in type cache, the lookup request will // return nil immediately. EnableNonexistentTypeCache bool // How long to allow the kernel to cache inode attributes. // // Any given object generation in GCS is immutable, and a new generation // results in a new inode number. So every update from a remote system results // in a new inode number, and it's therefore safe to allow the kernel to cache // inode attributes. // // The one exception to the above logic is that objects can be _deleted_, in // which case stat::st_nlink changes. So choosing this value comes down to // whether you care about that field being up to date. InodeAttributeCacheTTL time.Duration // If non-zero, each directory will maintain a cache from child name to // information about whether that name exists as a file and/or directory. // This may speed up calls to look up and stat inodes, especially when // combined with a stat-caching GCS bucket, but comes at the cost of // consistency: if the child is removed and recreated with a different type // before the expiration, we may fail to find it. DirTypeCacheTTL time.Duration // The UID and GID that owns all inodes in the file system. Uid uint32 Gid uint32 // Permissions bits to use for files and directories. No bits outside of // os.ModePerm may be set. FilePerms os.FileMode DirPerms os.FileMode // Allow renaming a directory containing fewer descendants than this limit. RenameDirLimit int64 // File chunk size to read from GCS in one call. Specified in MB. SequentialReadSizeMb int32 // NewConfig has all the config specified by the user using config-file or CLI flags. NewConfig *cfg.Config MetricHandle common.MetricHandle } // Create a fuse file system server according to the supplied configuration. func NewFileSystem(ctx context.Context, serverCfg *ServerConfig) (fuseutil.FileSystem, error) { // Check permissions bits. if serverCfg.FilePerms&^os.ModePerm != 0 { return nil, fmt.Errorf("illegal file perms: %v", serverCfg.FilePerms) } if serverCfg.DirPerms&^os.ModePerm != 0 { return nil, fmt.Errorf("illegal dir perms: %v", serverCfg.FilePerms) } mtimeClock := timeutil.RealClock() contentCache := contentcache.New(serverCfg.TempDir, mtimeClock) if serverCfg.LocalFileCache { err := contentCache.RecoverCache() if err != nil { fmt.Printf("Encountered error retrieving files from cache directory, disabling local file cache: %v", err) serverCfg.LocalFileCache = false } } // Create file cache handler if cache is enabled by user. Cache is considered // enabled only if cache-dir is not empty and file-cache:max-size-mb is non 0. var fileCacheHandler *file.CacheHandler if cfg.IsFileCacheEnabled(serverCfg.NewConfig) { var err error fileCacheHandler, err = createFileCacheHandler(serverCfg) if err != nil { return nil, err } } // Set up the basic struct. fs := &fileSystem{ mtimeClock: mtimeClock, cacheClock: serverCfg.CacheClock, bucketManager: serverCfg.BucketManager, localFileCache: serverCfg.LocalFileCache, contentCache: contentCache, implicitDirs: serverCfg.ImplicitDirectories, enableNonexistentTypeCache: serverCfg.EnableNonexistentTypeCache, inodeAttributeCacheTTL: serverCfg.InodeAttributeCacheTTL, dirTypeCacheTTL: serverCfg.DirTypeCacheTTL, kernelListCacheTTL: cfg.ListCacheTTLSecsToDuration(serverCfg.NewConfig.FileSystem.KernelListCacheTtlSecs), renameDirLimit: serverCfg.RenameDirLimit, sequentialReadSizeMb: serverCfg.SequentialReadSizeMb, uid: serverCfg.Uid, gid: serverCfg.Gid, fileMode: serverCfg.FilePerms, dirMode: serverCfg.DirPerms | os.ModeDir, inodes: make(map[fuseops.InodeID]inode.Inode), nextInodeID: fuseops.RootInodeID + 1, generationBackedInodes: make(map[inode.Name]inode.GenerationBackedInode), implicitDirInodes: make(map[inode.Name]inode.DirInode), folderInodes: make(map[inode.Name]inode.DirInode), localFileInodes: make(map[inode.Name]inode.Inode), handles: make(map[fuseops.HandleID]interface{}), newConfig: serverCfg.NewConfig, fileCacheHandler: fileCacheHandler, cacheFileForRangeRead: serverCfg.NewConfig.FileCache.CacheFileForRangeRead, metricHandle: serverCfg.MetricHandle, enableAtomicRenameObject: serverCfg.NewConfig.EnableAtomicRenameObject, globalMaxWriteBlocksSem: semaphore.NewWeighted(serverCfg.NewConfig.Write.GlobalMaxBlocks), } // Set up root bucket var root inode.DirInode if serverCfg.BucketName == "" || serverCfg.BucketName == "_" { logger.Info("Set up root directory for all accessible buckets") root = makeRootForAllBuckets(fs) } else { logger.Info("Set up root directory for bucket " + serverCfg.BucketName) syncerBucket, err := fs.bucketManager.SetUpBucket(ctx, serverCfg.BucketName, false, fs.metricHandle) if err != nil { return nil, fmt.Errorf("SetUpBucket: %w", err) } root = makeRootForBucket(ctx, fs, syncerBucket) } root.Lock() root.IncrementLookupCount() fs.inodes[fuseops.RootInodeID] = root fs.implicitDirInodes[root.Name()] = root fs.folderInodes[root.Name()] = root root.Unlock() // Set up invariant checking. fs.mu = locker.New("FS", fs.checkInvariants) return fs, nil } func createFileCacheHandler(serverCfg *ServerConfig) (fileCacheHandler *file.CacheHandler, err error) { var sizeInBytes uint64 // -1 means unlimited size for cache, the underlying LRU cache doesn't handle // -1 explicitly, hence we pass MaxUint64 as capacity in that case. if serverCfg.NewConfig.FileCache.MaxSizeMb == -1 { sizeInBytes = math.MaxUint64 } else { sizeInBytes = uint64(serverCfg.NewConfig.FileCache.MaxSizeMb) * cacheutil.MiB } fileInfoCache := lru.NewCache(sizeInBytes) cacheDir := string(serverCfg.NewConfig.CacheDir) // Adding a new directory inside cacheDir to keep file-cache separate from // metadata cache if and when we support storing metadata cache on disk in // the future. cacheDir = path.Join(cacheDir, cacheutil.FileCache) filePerm := cacheutil.DefaultFilePerm dirPerm := cacheutil.DefaultDirPerm cacheDirErr := cacheutil.CreateCacheDirectoryIfNotPresentAt(cacheDir, dirPerm) if cacheDirErr != nil { return nil, fmt.Errorf("createFileCacheHandler: while creating file cache directory: %w", cacheDirErr) } jobManager := downloader.NewJobManager(fileInfoCache, filePerm, dirPerm, cacheDir, serverCfg.SequentialReadSizeMb, &serverCfg.NewConfig.FileCache, serverCfg.MetricHandle) fileCacheHandler = file.NewCacheHandler(fileInfoCache, jobManager, cacheDir, filePerm, dirPerm) return } func makeRootForBucket( ctx context.Context, fs *fileSystem, syncerBucket gcsx.SyncerBucket) inode.DirInode { return inode.NewDirInode( fuseops.RootInodeID, inode.NewRootName(""), fuseops.InodeAttributes{ Uid: fs.uid, Gid: fs.gid, Mode: fs.dirMode, // We guarantee only that directory times be "reasonable". Atime: fs.mtimeClock.Now(), Ctime: fs.mtimeClock.Now(), Mtime: fs.mtimeClock.Now(), }, fs.implicitDirs, fs.newConfig.List.EnableEmptyManagedFolders, fs.enableNonexistentTypeCache, fs.dirTypeCacheTTL, &syncerBucket, fs.mtimeClock, fs.cacheClock, fs.newConfig.MetadataCache.TypeCacheMaxSizeMb, fs.newConfig.EnableHns, ) } func makeRootForAllBuckets(fs *fileSystem) inode.DirInode { return inode.NewBaseDirInode( fuseops.RootInodeID, inode.NewRootName(""), fuseops.InodeAttributes{ Uid: fs.uid, Gid: fs.gid, Mode: fs.dirMode, // We guarantee only that directory times be "reasonable". Atime: fs.mtimeClock.Now(), Ctime: fs.mtimeClock.Now(), Mtime: fs.mtimeClock.Now(), }, fs.bucketManager, fs.metricHandle, ) } //////////////////////////////////////////////////////////////////////// // fileSystem type //////////////////////////////////////////////////////////////////////// // LOCK ORDERING // // Let FS be the file system lock. Define a strict partial order < as follows: // // 1. For any inode lock I, I < FS. // 2. For any handle lock H and inode lock I, H < I. // // We follow the rule "acquire A then B only if A < B". // // In other words: // // * Don't hold multiple handle locks at the same time. // * Don't hold multiple inode locks at the same time. // * Don't acquire inode locks before handle locks. // * Don't acquire file system locks before either. // // The intuition is that we hold inode and handle locks for long-running // operations, and we don't want to block the entire file system on those. // // See https://tinyurl.com/4nh4w7u9 for more discussion, including an informal // proof that a strict partial order is sufficient. type fileSystem struct { fuseutil.NotImplementedFileSystem ///////////////////////// // Dependencies ///////////////////////// mtimeClock timeutil.Clock cacheClock timeutil.Clock bucketManager gcsx.BucketManager ///////////////////////// // Constant data ///////////////////////// localFileCache bool contentCache *contentcache.ContentCache implicitDirs bool enableNonexistentTypeCache bool inodeAttributeCacheTTL time.Duration dirTypeCacheTTL time.Duration // kernelListCacheTTL specifies the duration to keep the readdir response cached // in kernel. After ttl, gcsfuse, (filesystem) on next opendir call (just before as part // of next list call) from user, asks the kernel to evict the old cache entries. kernelListCacheTTL time.Duration renameDirLimit int64 sequentialReadSizeMb int32 // The user and group owning everything in the file system. uid uint32 gid uint32 // Mode bits for all inodes. fileMode os.FileMode dirMode os.FileMode ///////////////////////// // Mutable state ///////////////////////// // A lock protecting the state of the file system struct itself (distinct // from per-inode locks). Make sure to see the notes on lock ordering above. mu locker.Locker // The next inode ID to hand out. We assume that this will never overflow, // since even if we were handing out inode IDs at 4 GHz, it would still take // over a century to do so. // // GUARDED_BY(mu) nextInodeID fuseops.InodeID // The collection of live inodes, keyed by inode ID. No ID less than // fuseops.RootInodeID is ever used. // // INVARIANT: For all keys k, fuseops.RootInodeID <= k < nextInodeID // INVARIANT: For all keys k, inodes[k].ID() == k // INVARIANT: inodes[fuseops.RootInodeID] is missing or of type inode.DirInode // INVARIANT: For all v, if v.Name().IsDir() then v is inode.DirInode // // GUARDED_BY(mu) inodes map[fuseops.InodeID]inode.Inode // A map from object name to an inode for that name backed by a GCS object. // Populated during the name -> inode lookup process, cleared during the // forget inode process. // // Entries may be stale for two reasons: // // 1. There is a newer generation in GCS, not caused by the inode. The next // name lookup will detect this by statting the object, acquiring the // inode's lock (to get an up to date look at what the latest generation // the inode caused was), and replacing the entry if the inode's // generation is less than the stat generation. // // 2. The object no longer exists. This is harmless; the name lookup process // will return ENOENT before it ever consults this map. Eventually the // kernel will send ForgetInodeOp and we will clear the entry. // // Crucially, we never replace an up to date entry with a stale one. If the // name lookup process sees that the stat result is older than the inode, it // starts over, statting again. // // Note that there is no invariant that says *all* of the object-backed // inodes are represented here because we may have multiple distinct inodes // for a given name existing concurrently if we observe an object generation // that was not caused by our existing inode (e.g. if the file is clobbered // remotely). We must retain the old inode until the kernel tells us to // forget it. // // INVARIANT: For each k/v, v.Name() == k // INVARIANT: For each value v, inodes[v.ID()] == v // // GUARDED_BY(mu) generationBackedInodes map[inode.Name]inode.GenerationBackedInode // A map from object name to the implicit directory inode that represents // that name, if any. There can be at most one implicit directory inode for a // given name accessible to us at any given time. // // INVARIANT: For each k/v, v.Name() == k // INVARIANT: For each value v, inodes[v.ID()] == v // INVARIANT: For each value v, v is not ExplicitDirInode // INVARIANT: For each in in inodes such that in is DirInode but not // ExplicitDirInode, implicitDirInodes[d.Name()] == d // // GUARDED_BY(mu) implicitDirInodes map[inode.Name]inode.DirInode // A map from folder name to the folder inode that represents // that name, if any. There can be at most one folder inode for a // given name accessible to us at any given time. // // INVARIANT: For each k/v, v.Name() == k // INVARIANT: For each value v, inodes[v.ID()] == v // // GUARDED_BY(mu) folderInodes map[inode.Name]inode.DirInode // A map from object name to the local fileInode that represents // that name. There can be at most one local file inode for a // given name accessible to us at any given time. // // INVARIANT: For each k/v, v.Name() == k // INVARIANT: For each value v, inodes[v.ID()] == v // INVARIANT: For each value v, v is not fileInode // INVARIANT: For each f in inodes that is local fileInode, // localFileInodes[f.Name()] == f // // GUARDED_BY(mu) localFileInodes map[inode.Name]inode.Inode // The collection of live handles, keyed by handle ID. // // INVARIANT: All values are of type *dirHandle or *handle.FileHandle // // GUARDED_BY(mu) handles map[fuseops.HandleID]interface{} // The next handle ID to hand out. We assume that this will never overflow. // // INVARIANT: For all keys k in handles, k < nextHandleID // // GUARDED_BY(mu) nextHandleID fuseops.HandleID // newConfig specified by the user using config-file flag and CLI flags. newConfig *cfg.Config // fileCacheHandler manages read only file cache. It is non-nil only when // file cache is enabled at the time of mounting. fileCacheHandler *file.CacheHandler // cacheFileForRangeRead when true downloads file into cache even for // random file access. cacheFileForRangeRead bool metricHandle common.MetricHandle enableAtomicRenameObject bool // Limits the max number of blocks that can be created across file system when // streaming writes are enabled. globalMaxWriteBlocksSem *semaphore.Weighted } //////////////////////////////////////////////////////////////////////// // Helpers //////////////////////////////////////////////////////////////////////// func (fs *fileSystem) checkInvariantsForLocalFileInodes() { // INVARIANT: For each k/v, v.Name() == k for k, v := range fs.localFileInodes { if !(v.Name() == k) { panic(fmt.Sprintf( "Unexpected name: \"%s\" vs. \"%s\"", v.Name(), k)) } } // INVARIANT: For each value v, inodes[v.ID()] == v for _, v := range fs.localFileInodes { if fs.inodes[v.ID()] != v { panic(fmt.Sprintf( "Mismatch for ID %v: %v %v", v.ID(), fs.inodes[v.ID()], v)) } } // INVARIANT: For each value v, v is not fileInode for _, v := range fs.localFileInodes { if _, ok := v.(*inode.FileInode); !ok { panic(fmt.Sprintf( "Unexpected file inode %d, type %T", v.ID(), v)) } } // INVARIANT: For each f in inodes that is local fileInode // localFileInodes[d.Name()] == f for _, in := range fs.inodes { fileInode, ok := in.(*inode.FileInode) if ok && fileInode.IsLocal() && !fileInode.IsUnlinked() { if !(fs.localFileInodes[in.Name()] == in) { panic(fmt.Sprintf( "localFileInodes mismatch: %q %v %v", in.Name(), fs.localFileInodes[in.Name()], in)) } } } } func (fs *fileSystem) checkInvariantsForFolderInodes() { // INVARIANT: For each k/v, v.Name() == k for k, v := range fs.folderInodes { if !(v.Name() == k) { panic(fmt.Sprintf( "Unexpected name: \"%s\" vs. \"%s\"", v.Name(), k)) } } // INVARIANT: For each value v, inodes[v.ID()] == v for _, v := range fs.folderInodes { if fs.inodes[v.ID()] != v { panic(fmt.Sprintf( "Mismatch for ID %v: %v %v", v.ID(), fs.inodes[v.ID()], v)) } } } func (fs *fileSystem) checkInvariantsForImplicitDirs() { // INVARIANT: For each k/v, v.Name() == k for k, v := range fs.implicitDirInodes { if !(v.Name() == k) { panic(fmt.Sprintf( "Unexpected name: \"%s\" vs. \"%s\"", v.Name(), k)) } } // INVARIANT: For each value v, inodes[v.ID()] == v for _, v := range fs.implicitDirInodes { if fs.inodes[v.ID()] != v { panic(fmt.Sprintf( "Mismatch for ID %v: %v %v", v.ID(), fs.inodes[v.ID()], v)) } } // INVARIANT: For each value v, v is not ExplicitDirInode for _, v := range fs.implicitDirInodes { if _, ok := v.(inode.ExplicitDirInode); ok { panic(fmt.Sprintf( "Unexpected implicit dir inode %d, type %T", v.ID(), v)) } } // INVARIANT: For each in in inodes such that in is DirInode but not // ExplicitDirInode, implicitDirInodes[d.Name()] == d for _, in := range fs.inodes { _, dir := in.(inode.DirInode) _, edir := in.(inode.ExplicitDirInode) if dir && !edir { if !(fs.implicitDirInodes[in.Name()] == in) { panic(fmt.Sprintf( "implicitDirInodes mismatch: %q %v %v", in.Name(), fs.implicitDirInodes[in.Name()], in)) } } } } func (fs *fileSystem) checkInvariantsForGenerationBackedInodes() { // INVARIANT: For each k/v, v.Name() == k for k, v := range fs.generationBackedInodes { if !(v.Name() == k) { panic(fmt.Sprintf( "Unexpected name: \"%s\" vs. \"%s\"", v.Name(), k)) } } // INVARIANT: For each value v, inodes[v.ID()] == v for _, v := range fs.generationBackedInodes { if fs.inodes[v.ID()] != v { panic(fmt.Sprintf( "Mismatch for ID %v: %v %v", v.ID(), fs.inodes[v.ID()], v)) } } } func (fs *fileSystem) checkInvariantsForInodes() { // INVARIANT: For all keys k, fuseops.RootInodeID <= k < nextInodeID for id := range fs.inodes { if id < fuseops.RootInodeID || id >= fs.nextInodeID { panic(fmt.Sprintf("Illegal inode ID: %v", id)) } } // INVARIANT: For all keys k, inodes[k].ID() == k for id, in := range fs.inodes { if in.ID() != id { panic(fmt.Sprintf("ID mismatch: %v vs. %v", in.ID(), id)) } } // INVARIANT: inodes[fuseops.RootInodeID] is missing or of type inode.DirInode // // The missing case is when we've received a forget request for the root // inode, while unmounting. switch in := fs.inodes[fuseops.RootInodeID].(type) { case nil: case inode.DirInode: default: panic(fmt.Sprintf("Unexpected type for root: %v", reflect.TypeOf(in))) } // INVARIANT: For all v, if v.Name().IsDir() then v is inode.DirInode for _, in := range fs.inodes { if in.Name().IsDir() { _, ok := in.(inode.DirInode) if !ok { panic(fmt.Sprintf( "Unexpected inode type for name \"%s\": %v", in.Name(), reflect.TypeOf(in))) } } } } func (fs *fileSystem) checkInvariants() { // Check invariants for different type of inodes fs.checkInvariantsForInodes() fs.checkInvariantsForGenerationBackedInodes() fs.checkInvariantsForImplicitDirs() fs.checkInvariantsForFolderInodes() fs.checkInvariantsForLocalFileInodes() ////////////////////////////////// // handles ////////////////////////////////// // INVARIANT: All values are of type *dirHandle or *handle.FileHandle for _, h := range fs.handles { switch h.(type) { case *handle.DirHandle: case *handle.FileHandle: default: panic(fmt.Sprintf("Unexpected handle type: %T", h)) } } ////////////////////////////////// // nextHandleID ////////////////////////////////// // INVARIANT: For all keys k in handles, k < nextHandleID for k := range fs.handles { if k >= fs.nextHandleID { panic(fmt.Sprintf("Illegal handle ID: %v", k)) } } } func (fs *fileSystem) createExplicitDirInode(inodeID fuseops.InodeID, ic inode.Core) inode.Inode { in := inode.NewExplicitDirInode( inodeID, ic.FullName, ic.MinObject, fuseops.InodeAttributes{ Uid: fs.uid, Gid: fs.gid, Mode: fs.dirMode, // We guarantee only that directory times be "reasonable". Atime: fs.mtimeClock.Now(), Ctime: fs.mtimeClock.Now(), Mtime: fs.mtimeClock.Now(), }, fs.implicitDirs, fs.newConfig.List.EnableEmptyManagedFolders, fs.enableNonexistentTypeCache, fs.dirTypeCacheTTL, ic.Bucket, fs.mtimeClock, fs.cacheClock, fs.newConfig.MetadataCache.TypeCacheMaxSizeMb, fs.newConfig.EnableHns) return in } // Implementation detail of lookUpOrCreateInodeIfNotStale; do not use outside // of that function. // // LOCKS_REQUIRED(fs.mu) func (fs *fileSystem) mintInode(ic inode.Core) (in inode.Inode) { // Choose an ID. id := fs.nextInodeID fs.nextInodeID++ // Create the inode. switch { // Explicit directories or folders in hierarchical bucket. case (ic.MinObject != nil && ic.FullName.IsDir()), ic.Folder != nil: in = fs.createExplicitDirInode(id, ic) // Implicit directories case ic.FullName.IsDir(): in = inode.NewDirInode( id, ic.FullName, fuseops.InodeAttributes{ Uid: fs.uid, Gid: fs.gid, Mode: fs.dirMode, // We guarantee only that directory times be "reasonable". Atime: fs.mtimeClock.Now(), Ctime: fs.mtimeClock.Now(), Mtime: fs.mtimeClock.Now(), }, fs.implicitDirs, fs.newConfig.List.EnableEmptyManagedFolders, fs.enableNonexistentTypeCache, fs.dirTypeCacheTTL, ic.Bucket, fs.mtimeClock, fs.cacheClock, fs.newConfig.MetadataCache.TypeCacheMaxSizeMb, fs.newConfig.EnableHns, ) case inode.IsSymlink(ic.MinObject): in = inode.NewSymlinkInode( id, ic.FullName, ic.MinObject, fuseops.InodeAttributes{ Uid: fs.uid, Gid: fs.gid, Mode: fs.fileMode | os.ModeSymlink, }) default: in = inode.NewFileInode( id, ic.FullName, ic.MinObject, fuseops.InodeAttributes{ Uid: fs.uid, Gid: fs.gid, Mode: fs.fileMode, }, ic.Bucket, fs.localFileCache, fs.contentCache, fs.mtimeClock, ic.Local, fs.newConfig, fs.globalMaxWriteBlocksSem) } // Place it in our map of IDs to inodes. fs.inodes[in.ID()] = in return } // Return the dir Inode. // // LOCKS_EXCLUDED(fs.mu) // UNLOCK_FUNCTION(fs.mu) // LOCK_FUNCTION(in) func (fs *fileSystem) createDirInode(ic inode.Core, inodes map[inode.Name]inode.DirInode) inode.Inode { if !ic.FullName.IsDir() { panic(fmt.Sprintf("Unexpected name for a directory: %q", ic.FullName)) } var maxTriesToCreateInode = 3 for n := 0; n < maxTriesToCreateInode; n++ { in, ok := (inodes)[ic.FullName] // Create a new inode when a folder is created first time, or when a folder is deleted and then recreated with the same name. if !ok || in.IsUnlinked() { in := fs.mintInode(ic) (inodes)[in.Name()] = in.(inode.DirInode) in.Lock() return in } fs.mu.Unlock() in.Lock() fs.mu.Lock() if (inodes)[ic.FullName] != in { in.Unlock() continue } return in } return nil } // Attempt to find an inode for a backing object or an implicit directory. // Create an inode if (1) it has never yet existed, or (2) the object is newer // than the existing one. // // If the backing object is older than the existing inode, return nil. In this // case, the caller may obtain a fresh record and try again. Otherwise, // increment the inode's lookup count and return it locked. // // LOCKS_EXCLUDED(fs.mu) // UNLOCK_FUNCTION(fs.mu) // LOCK_FUNCTION(in) func (fs *fileSystem) lookUpOrCreateInodeIfNotStale(ic inode.Core) (in inode.Inode) { if err := ic.SanityCheck(); err != nil { panic(err.Error()) } // Ensure that no matter which inode we return, we increase its lookup count // on the way out and then release the file system lock. defer func() { if in != nil { in.IncrementLookupCount() } fs.mu.Unlock() }() fs.mu.Lock() // Handle Folders in hierarchical bucket. if ic.Folder != nil { return fs.createDirInode(ic, fs.folderInodes) } // Handle implicit directories. if ic.MinObject == nil { return fs.createDirInode(ic, fs.implicitDirInodes) } oGen := inode.Generation{ Object: ic.MinObject.Generation, Metadata: ic.MinObject.MetaGeneration, Size: ic.MinObject.Size, } // Retry loop for the stale index entry case below. On entry, we hold fs.mu // but no inode lock. for { // Look at the current index entry. existingInode, ok := fs.generationBackedInodes[ic.FullName] // If we have no existing record, mint an inode and return it. if !ok { in = fs.mintInode(ic) fs.generationBackedInodes[in.Name()] = in.(inode.GenerationBackedInode) in.Lock() return } // Otherwise we need to read the inode's source generation below, which // requires the inode's lock. We must not hold the inode lock while // acquiring the file system lock, so drop it while acquiring the inode's // lock, then reacquire. fs.mu.Unlock() existingInode.Lock() fs.mu.Lock() // Check that the index still points at this inode. If not, it's possible // that the inode is in the process of being destroyed and is unsafe to // use. Go around and try again. if fs.generationBackedInodes[ic.FullName] != existingInode { existingInode.Unlock() continue } // Have we found the correct inode? cmp := oGen.Compare(existingInode.SourceGeneration()) if cmp == 0 { in = existingInode return } // The existing inode is newer than the backing object. The caller // should call again with a newer backing object. if cmp == -1 { existingInode.Unlock() return } // The backing object is newer than the existing inode, while // holding the inode lock, excluding concurrent actions by the inode (in // particular concurrent calls to Sync, which changes generation numbers). // This means we've proven that the record cannot have been caused by the // inode's actions, and therefore this is not the inode we want. // // Replace it with a newly-mintend inode and then go around, acquiring its // lock in accordance with our lock ordering rules. existingInode.Unlock() in = fs.mintInode(ic) fs.generationBackedInodes[in.Name()] = in.(inode.GenerationBackedInode) continue } } // Look up the child with the given name within the parent, then return an // existing inode for that child or create a new one if necessary. Return // ENOENT if the child doesn't exist. // // Return the child locked, incrementing its lookup count. // // LOCKS_EXCLUDED(fs.mu) // LOCKS_EXCLUDED(parent) // LOCK_FUNCTION(child) func (fs *fileSystem) lookUpOrCreateChildInode( ctx context.Context, parent inode.DirInode, childName string) (child inode.Inode, err error) { // First check if the requested child is a localFileInode. child, err = fs.lookUpLocalFileInode(parent, childName) if err != nil { return nil, err } if child != nil { return } // If the requested child is not a localFileInode, continue with the existing // flow of checking GCS for file/directory. // Set up a function that will find a lookup result for the child with the // given name. Expects no locks to be held. getLookupResult := func() (*inode.Core, error) { if fs.newConfig.FileSystem.DisableParallelDirops { parent.Lock() defer parent.Unlock() } else { // LockForChildLookup takes read-only or exclusive lock based on the // inode when its child is looked up. parent.LockForChildLookup() defer parent.UnlockForChildLookup() } return parent.LookUpChild(ctx, childName) } // Run a retry loop around lookUpOrCreateInodeIfNotStale. const maxTries = 3 for n := 0; n < maxTries; n++ { // Create a record. var core *inode.Core core, err = getLookupResult() if err != nil { return } if core == nil { err = fuse.ENOENT return } // Attempt to create the inode. Return if successful. child = fs.lookUpOrCreateInodeIfNotStale(*core) if child != nil { return } } err = fmt.Errorf("cannot find %q in %q with %v tries", childName, parent.Name(), maxTries) return } // Look up the localFileInodes to check if a file with given name exists. // Return inode if it exists, else return nil. // LOCKS_EXCLUDED(fs.mu) // LOCKS_EXCLUDED(parent) // UNLOCK_FUNCTION(fs.mu) // LOCK_FUNCTION(child) func (fs *fileSystem) lookUpLocalFileInode(parent inode.DirInode, childName string) (child inode.Inode, err error) { // If the path specified is "a/\n", the child would come as \n which is not a valid childname. // In such cases, simply return a file-not-found. if childName == inode.ConflictingFileNameSuffix { return nil, syscall.ENOENT } // Trim the suffix assigned to fix conflicting names. childName = strings.TrimSuffix(childName, inode.ConflictingFileNameSuffix) fileName := inode.NewFileName(parent.Name(), childName) fs.mu.Lock() defer func() { if child != nil { child.IncrementLookupCount() } fs.mu.Unlock() }() var maxTriesToLookupInode = 3 for n := 0; n < maxTriesToLookupInode; n++ { child = fs.localFileInodes[fileName] if child == nil { return } // If the inode already exists, we need to follow the lock ordering rules // to get the lock. First get inode lock and then fs lock. fs.mu.Unlock() child.Lock() // Acquiring fs lock early to use common defer function even though it is // not required to check if local file inode has been unlinked. // Filesystem lock will be held till we increment lookUpCount to avoid // deletion of inode from fs.inodes/fs.localFileInodes map by other flows. fs.mu.Lock() // Check if local file inode has been unlinked? fileInode, ok := child.(*inode.FileInode) if ok && fileInode.IsUnlinked() { child.Unlock() child = nil return } // Once we get fs lock, validate if the inode is still valid. If not // try to fetch it again. Eg: If the inode is deleted by other thread after // we fetched it from fs.localFileInodes map, then any call to perform // inode operation will crash GCSFuse since the inode is not valid. Hence // it is important to acquire lock and increment lookUpCount before letting // other threads modify it. if fs.localFileInodes[fileName] != child { child.Unlock() continue } return } // In case we exhausted the retries, return nil object. child = nil return } // Look up the child directory with the given name within the parent, then // return an existing dir inode for that child or create a new one if necessary. // Return ENOENT if the child doesn't exist. // // Return the child locked, incrementing its lookup count. // // LOCKS_EXCLUDED(fs.mu) // LOCKS_EXCLUDED(parent) // LOCK_FUNCTION(child) func (fs *fileSystem) lookUpOrCreateChildDirInode( ctx context.Context, parent inode.DirInode, childName string) (child inode.BucketOwnedDirInode, err error) { in, err := fs.lookUpOrCreateChildInode(ctx, parent, childName) if err != nil { return nil, fmt.Errorf("lookup or create %q: %w", childName, err) } var ok bool if child, ok = in.(inode.BucketOwnedDirInode); !ok { fs.unlockAndDecrementLookupCount(in, 1) return nil, fmt.Errorf("not a bucket owned directory: %q", childName) } return child, nil } // promoteToGenerationBacked updates the file system maps for the given file inode // after it has been synced to GCS. // The inode is removed from the localFileInodes map and added to the // generationBackedInodes map. // // LOCKS_EXCLUDED(fs.mu) // LOCKS_REQUIRED(f) func (fs *fileSystem) promoteToGenerationBacked(f *inode.FileInode) { fs.mu.Lock() delete(fs.localFileInodes, f.Name()) if _, ok := fs.generationBackedInodes[f.Name()]; !ok { fs.generationBackedInodes[f.Name()] = f } fs.mu.Unlock() // We need not update fileIndex: // // We've held the inode lock the whole time, so there's no way that this // inode could have been booted from the index. Therefore, if it's not in the // index at the moment, it must not have been in there when we started. That // is, it must have been clobbered remotely. // // In other words, either this inode is still in the index or it has been // clobbered and *should* be anonymous. } // Flushes the supplied file inode to GCS, updating the index as // appropriate. // // LOCKS_EXCLUDED(fs.mu) // LOCKS_REQUIRED(f) func (fs *fileSystem) flushFile( ctx context.Context, f *inode.FileInode) error { // FlushFile mirrors the behavior of native filesystems by not returning an error // when file to be synced has been unlinked from the same mount. if f.IsUnlinked() { return nil } // Flush the inode. err := f.Flush(ctx) if err != nil { err = fmt.Errorf("FileInode.Sync: %w", err) // If the inode was local file inode, treat it as unlinked. fs.mu.Lock() delete(fs.localFileInodes, f.Name()) fs.mu.Unlock() return err } // Promote the inode to generationBackedInodes in fs maps. fs.promoteToGenerationBacked(f) return nil } // Synchronizes the supplied file inode to GCS, updating the index as // appropriate. // // LOCKS_EXCLUDED(fs.mu) // LOCKS_REQUIRED(f) func (fs *fileSystem) syncFile( ctx context.Context, f *inode.FileInode) error { // SyncFile mirrors the behavior of native filesystems by not returning an error // when file to be synced has been unlinked from the same mount. if f.IsUnlinked() { return nil } // Sync the inode. gcsSynced, err := f.Sync(ctx) if err != nil { err = fmt.Errorf("FileInode.Sync: %w", err) // If the inode was local file inode, treat it as unlinked. fs.mu.Lock() delete(fs.localFileInodes, f.Name()) fs.mu.Unlock() return err } // If gcsSynced is true, it means the inode was fully synced to GCS In this // case, we need to promote the inode to generationBackedInodes in fs maps. if gcsSynced { fs.promoteToGenerationBacked(f) } return nil } // Initializes Buffered Write Handler if Eligible and synchronizes the file inode to GCS if initialization succeeds. // Otherwise creates an empty temp writer if temp file nil. // // LOCKS_EXCLUDED(fs.mu) // LOCKS_REQUIRED(f.mu) func (fs *fileSystem) createBufferedWriteHandlerAndSyncOrTempWriter(ctx context.Context, f *inode.FileInode) error { err := fs.initBufferedWriteHandlerAndSyncFileIfEligible(ctx, f) if err != nil { return err } err = f.CreateEmptyTempFile(ctx) if err != nil { return err } return nil } // Initializes Buffered Write Handler if Eligible and synchronizes the file inode to GCS if initialization succeeds. // // LOCKS_EXCLUDED(fs.mu) // LOCKS_REQUIRED(f.mu) func (fs *fileSystem) initBufferedWriteHandlerAndSyncFileIfEligible(ctx context.Context, f *inode.FileInode) error { initialized, err := f.InitBufferedWriteHandlerIfEligible(ctx) if err != nil { return err } if initialized { // Calling syncFile is safe here as we have file inode lock and BWH is initialized. // Thus sync method of BWH will be invoked. // 1. In case of zonal bucket it creates unfinalized new generation object. // 2. In case of non zonal bucket it's no-op as we don't have pending buffers to upload. err = fs.syncFile(ctx, f) if err != nil { return err } } return nil } // Decrement the supplied inode's lookup count, destroying it if the inode says // that it has hit zero. // // We require the file system lock to exclude concurrent lookups, which might // otherwise find an inode whose lookup count has gone to zero. // // LOCKS_REQUIRED(in) // LOCKS_EXCLUDED(fs.mu) // UNLOCK_FUNCTION(fs.mu) // UNLOCK_FUNCTION(in) func (fs *fileSystem) unlockAndDecrementLookupCount(in inode.Inode, N uint64) { name := in.Name() // Decrement the lookup count. shouldDestroy := in.DecrementLookupCount(N) // Update file system state, orphaning the inode if we're going to destroy it // below. if shouldDestroy { fs.mu.Lock() delete(fs.inodes, in.ID()) // Update indexes if necessary. if fs.generationBackedInodes[name] == in { delete(fs.generationBackedInodes, name) } if fs.implicitDirInodes[name] == in { delete(fs.implicitDirInodes, name) } if fs.localFileInodes[name] == in { delete(fs.localFileInodes, name) } if fs.folderInodes[name] == in { delete(fs.folderInodes, name) } fs.mu.Unlock() } // Now we can destroy the inode if necessary. if shouldDestroy { destroyErr := in.Destroy() if destroyErr != nil { logger.Infof("Error destroying inode %q: %v", name, destroyErr) } } in.Unlock() } // A helper function for use after incrementing an inode's lookup count. // Ensures that the lookup count is decremented again if the caller is going to // return in error (in which case the kernel and gcsfuse would otherwise // disagree about the lookup count for the inode's ID), so that the inode isn't // leaked. // // Typical usage: // // func (fs *fileSystem) doFoo() (err error) { // in, err := fs.lookUpOrCreateInodeIfNotStale(...) // if err != nil { // return // } // // defer fs.unlockAndMaybeDisposeOfInode(in, &err) // // ... // } // // LOCKS_REQUIRED(in) // LOCKS_EXCLUDED(fs.mu) // UNLOCK_FUNCTION(in) func (fs *fileSystem) unlockAndMaybeDisposeOfInode( in inode.Inode, err *error) { // If there is no error, just unlock. if *err == nil { in.Unlock() return } // Otherwise, go through the decrement helper fs.unlockAndDecrementLookupCount(in, 1) } // Fetch attributes for the supplied inode and fill in an appropriate // expiration time for them. // // LOCKS_REQUIRED(in) func (fs *fileSystem) getAttributes( ctx context.Context, in inode.Inode) ( attr fuseops.InodeAttributes, expiration time.Time, err error) { // Call through. attr, err = in.Attributes(ctx) if err != nil { return } // Set up the expiration time. if fs.inodeAttributeCacheTTL > 0 { expiration = time.Now().Add(fs.inodeAttributeCacheTTL) } return } // inodeOrDie returns the inode with the given ID, panicking with a helpful // error message if it doesn't exist. // // LOCKS_REQUIRED(fs.mu) func (fs *fileSystem) inodeOrDie(id fuseops.InodeID) (in inode.Inode) { in = fs.inodes[id] if in == nil { panic(fmt.Sprintf("inode %d doesn't exist", id)) } return } // dirInodeOrDie returns the directory inode with the given ID, panicking with // a helpful error message if it doesn't exist or is the wrong type. // // LOCKS_REQUIRED(fs.mu) func (fs *fileSystem) dirInodeOrDie(id fuseops.InodeID) (in inode.DirInode) { tmp := fs.inodes[id] in, ok := tmp.(inode.DirInode) if !ok { panic(fmt.Sprintf("inode %d is %T, wanted inode.DirInode", id, tmp)) } return } // fileInodeOrDie returns the file inode with the given ID, panicking with a // helpful error message if it doesn't exist or is the wrong type. // // LOCKS_REQUIRED(fs.mu) func (fs *fileSystem) fileInodeOrDie(id fuseops.InodeID) (in *inode.FileInode) { tmp := fs.inodes[id] in, ok := tmp.(*inode.FileInode) if !ok { panic(fmt.Sprintf("inode %d is %T, wanted *inode.FileInode", id, tmp)) } return } // symlinkInodeOrDie returns the symlink inode with the given ID, panicking // with a helpful error message if it doesn't exist or is the wrong type. // // LOCKS_REQUIRED(fs.mu) func (fs *fileSystem) symlinkInodeOrDie( id fuseops.InodeID) (in *inode.SymlinkInode) { tmp := fs.inodes[id] in, ok := tmp.(*inode.SymlinkInode) if !ok { panic(fmt.Sprintf("inode %d is %T, wanted *inode.SymlinkInode", id, tmp)) } return } // invalidateChildFileCacheIfExist invalidates the file in read cache. This is used to // invalidate the file in read cache after deletion of original file. func (fs *fileSystem) invalidateChildFileCacheIfExist(parentInode inode.DirInode, objectGCSName string) (err error) { if fs.fileCacheHandler != nil { if bucketOwnedDirInode, ok := parentInode.(inode.BucketOwnedDirInode); ok { bucketName := bucketOwnedDirInode.Bucket().Name() // Invalidate the file cache entry if it exists. err := fs.fileCacheHandler.InvalidateCache(objectGCSName, bucketName) if err != nil { return fmt.Errorf("invalidateChildFileCacheIfExist: while invalidating the file cache: %w", err) } } else { // The parentInode is not owned by any bucket, which means it's the base // directory that holds all the buckets' root directories. So, this op // is to delete a bucket, which is not supported. return fmt.Errorf("invalidateChildFileCacheIfExist: not an BucketOwnedDirInode: %w", syscall.ENOTSUP) } } return nil } //////////////////////////////////////////////////////////////////////// // fuse.FileSystem methods //////////////////////////////////////////////////////////////////////// func (fs *fileSystem) Destroy() { fs.bucketManager.ShutDown() if fs.fileCacheHandler != nil { _ = fs.fileCacheHandler.Destroy() } } func (fs *fileSystem) StatFS( ctx context.Context, op *fuseops.StatFSOp) (err error) { // Simulate a large amount of free space so that the Finder doesn't refuse to // copy in files. (See issue #125.) Use 2^17 as the block size because that // is the largest that OS X will pass on. op.BlockSize = 1 << 17 op.Blocks = 1 << 33 op.BlocksFree = op.Blocks op.BlocksAvailable = op.Blocks // Similarly with inodes. op.Inodes = 1 << 50 op.InodesFree = op.Inodes // Prefer large transfers. This is the largest value that OS X will // faithfully pass on, according to fuseops/ops.go. op.IoSize = 1 << 20 return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) LookUpInode( ctx context.Context, op *fuseops.LookUpInodeOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Find the parent directory in question. fs.mu.Lock() parent := fs.dirInodeOrDie(op.Parent) fs.mu.Unlock() // Find or create the child inode. child, err := fs.lookUpOrCreateChildInode(ctx, parent, op.Name) if err != nil { return err } defer fs.unlockAndMaybeDisposeOfInode(child, &err) // Fill out the response. e := &op.Entry e.Child = child.ID() e.Attributes, e.AttributesExpiration, err = fs.getAttributes(ctx, child) if err != nil { return err } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) GetInodeAttributes( ctx context.Context, op *fuseops.GetInodeAttributesOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Find the inode. fs.mu.Lock() in := fs.inodeOrDie(op.Inode) fs.mu.Unlock() in.Lock() defer in.Unlock() // Grab its attributes. op.Attributes, op.AttributesExpiration, err = fs.getAttributes(ctx, in) if err != nil { return err } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) SetInodeAttributes( ctx context.Context, op *fuseops.SetInodeAttributesOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Find the inode. fs.mu.Lock() in := fs.inodeOrDie(op.Inode) fs.mu.Unlock() in.Lock() defer in.Unlock() file, isFile := in.(*inode.FileInode) // Set file mtimes. if isFile && op.Mtime != nil { err = file.SetMtime(ctx, *op.Mtime) if err != nil { err = fmt.Errorf("SetMtime: %w", err) return err } } // Truncate files. if isFile && op.Size != nil { // Initialize BWH if eligible and Sync file inode. err = fs.initBufferedWriteHandlerAndSyncFileIfEligible(ctx, file) if err != nil { return } err = file.Truncate(ctx, int64(*op.Size)) if err != nil { err = fmt.Errorf("truncate: %w", err) return err } } // We silently ignore updates to mode and atime. // Fill in the response. op.Attributes, op.AttributesExpiration, err = fs.getAttributes(ctx, in) if err != nil { err = fmt.Errorf("getAttributes: %w", err) return err } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) ForgetInode( ctx context.Context, op *fuseops.ForgetInodeOp) (err error) { // Find the inode. fs.mu.Lock() in := fs.inodeOrDie(op.Inode) fs.mu.Unlock() // Decrement and unlock. in.Lock() fs.unlockAndDecrementLookupCount(in, op.N) return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) MkDir( ctx context.Context, op *fuseops.MkDirOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Find the parent. fs.mu.Lock() parent := fs.dirInodeOrDie(op.Parent) fs.mu.Unlock() // Create an empty backing object for the child, failing if it already // exists. parent.Lock() result, err := parent.CreateChildDir(ctx, op.Name) parent.Unlock() // Special case: *gcs.PreconditionError means the name already exists. var preconditionErr *gcs.PreconditionError if errors.As(err, &preconditionErr) { err = fuse.EEXIST return } // Propagate other errors. if err != nil { err = fmt.Errorf("CreateChildDir: %w", err) return err } // Attempt to create a child inode using the object we created. If we fail to // do so, it means someone beat us to the punch with a newer generation // (unlikely, so we're probably okay with failing here). child := fs.lookUpOrCreateInodeIfNotStale(*result) if child == nil { err = fmt.Errorf("newly-created record is already stale") return err } defer fs.unlockAndMaybeDisposeOfInode(child, &err) // Fill out the response. e := &op.Entry e.Child = child.ID() e.Attributes, e.AttributesExpiration, err = fs.getAttributes(ctx, child) if err != nil { err = fmt.Errorf("getAttributes: %w", err) return err } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) MkNode( ctx context.Context, op *fuseops.MkNodeOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } if (op.Mode & (iofs.ModeNamedPipe | iofs.ModeSocket)) != 0 { return syscall.ENOTSUP } // Create the child. child, err := fs.createFile(ctx, op.Parent, op.Name, op.Mode) if err != nil { return err } defer fs.unlockAndMaybeDisposeOfInode(child, &err) // Fill out the response. e := &op.Entry e.Child = child.ID() e.Attributes, e.AttributesExpiration, err = fs.getAttributes(ctx, child) if err != nil { err = fmt.Errorf("getAttributes: %w", err) return err } return } // Create a child of the parent with the given ID, returning the child locked // and with its lookup count incremented. // // LOCKS_EXCLUDED(fs.mu) // LOCK_FUNCTION(child) func (fs *fileSystem) createFile( ctx context.Context, parentID fuseops.InodeID, name string, mode os.FileMode) (child inode.Inode, err error) { // Find the parent. fs.mu.Lock() parent := fs.dirInodeOrDie(parentID) fs.mu.Unlock() // Create an empty backing object for the child, failing if it already // exists. parent.Lock() result, err := parent.CreateChildFile(ctx, name) parent.Unlock() // Special case: *gcs.PreconditionError means the name already exists. var preconditionErr *gcs.PreconditionError if errors.As(err, &preconditionErr) { err = fuse.EEXIST return } // Propagate other errors. if err != nil { err = fmt.Errorf("CreateChildFile: %w", err) return } // Attempt to create a child inode using the object we created. If we fail to // do so, it means someone beat us to the punch with a newer generation // (unlikely, so we're probably okay with failing here). child = fs.lookUpOrCreateInodeIfNotStale(*result) if child == nil { err = fmt.Errorf("newly-created record is already stale") return } return } // Creates localFileInode with the given name under the parent inode. // LOCKS_EXCLUDED(fs.mu) // UNLOCK_FUNCTION(fs.mu) // LOCK_FUNCTION(child) func (fs *fileSystem) createLocalFile(ctx context.Context, parentID fuseops.InodeID, name string) (child inode.Inode, err error) { // Find the parent. fs.mu.Lock() parent := fs.dirInodeOrDie(parentID) defer func() { if err != nil { if child == nil { return } // fs.mu lock is already taken delete(fs.localFileInodes, child.Name()) } // We need to release the filesystem lock before acquiring the inode lock. fs.mu.Unlock() if child != nil { child.Lock() child.IncrementLookupCount() // Unlock is done by the calling method. } }() fullName := inode.NewFileName(parent.Name(), name) child, ok := fs.localFileInodes[fullName] if ok && !child.(*inode.FileInode).IsUnlinked() { return } // Create a new inode when a file is created first time, or when a local file is unlinked and then recreated with the same name. core, err := parent.CreateLocalChildFileCore(name) if err != nil { return } child = fs.mintInode(core) fs.localFileInodes[child.Name()] = child fileInode := child.(*inode.FileInode) // Use deferred locking on filesystem so that it is locked before the defer call that unlocks the mutex and it doesn't fail. // We need to release the filesystem lock before acquiring the inode lock. fs.mu.Unlock() defer fs.mu.Lock() fileInode.Lock() err = fs.createBufferedWriteHandlerAndSyncOrTempWriter(ctx, fileInode) fileInode.Unlock() if err != nil { return } parent.Lock() defer parent.Unlock() parent.InsertFileIntoTypeCache(name) return child, nil } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) CreateFile( ctx context.Context, op *fuseops.CreateFileOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Create the child. var child inode.Inode if fs.newConfig.Write.CreateEmptyFile { child, err = fs.createFile(ctx, op.Parent, op.Name, op.Mode) } else { child, err = fs.createLocalFile(ctx, op.Parent, op.Name) } if err != nil { return err } defer fs.unlockAndMaybeDisposeOfInode(child, &err) // Allocate a handle. fs.mu.Lock() handleID := fs.nextHandleID fs.nextHandleID++ // Creating new file is always a write operation, hence passing readOnly as false. fs.handles[handleID] = handle.NewFileHandle(child.(*inode.FileInode), fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, false) op.Handle = handleID fs.mu.Unlock() // Fill out the response. e := &op.Entry e.Child = child.ID() e.Attributes, e.AttributesExpiration, err = fs.getAttributes(ctx, child) if err != nil { err = fmt.Errorf("getAttributes: %w", err) return err } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) CreateSymlink( ctx context.Context, op *fuseops.CreateSymlinkOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Find the parent. fs.mu.Lock() parent := fs.dirInodeOrDie(op.Parent) fs.mu.Unlock() // Create the object in GCS, failing if it already exists. parent.Lock() result, err := parent.CreateChildSymlink(ctx, op.Name, op.Target) parent.Unlock() // Special case: *gcs.PreconditionError means the name already exists. var preconditionErr *gcs.PreconditionError if errors.As(err, &preconditionErr) { err = fuse.EEXIST return } // Propagate other errors. if err != nil { err = fmt.Errorf("CreateChildSymlink: %w", err) return err } // Attempt to create a child inode using the object we created. If we fail to // do so, it means someone beat us to the punch with a newer generation // (unlikely, so we're probably okay with failing here). child := fs.lookUpOrCreateInodeIfNotStale(*result) if child == nil { err = fmt.Errorf("newly-created record is already stale") return err } defer fs.unlockAndMaybeDisposeOfInode(child, &err) // Fill out the response. e := &op.Entry e.Child = child.ID() e.Attributes, e.AttributesExpiration, err = fs.getAttributes(ctx, child) if err != nil { err = fmt.Errorf("getAttributes: %w", err) return err } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) RmDir( // When rm -r or os.RemoveAll call is made, the following calls are made in order // 1. RmDir (only in the case of os.RemoveAll) // 2. Unlink all nested files, // 3. lookupInode call on implicit directory // 4. Rmdir on the directory. // // When type cache ttl is set, we construct an implicitDir even though one doesn't // exist on GCS (https://github.com/GoogleCloudPlatform/gcsfuse/blob/master/internal/fs/inode/dir.go#L452), // and thus, we get rmDir call to GCSFuse. // Whereas when ttl is zero, lookupInode call itself fails and RmDir is not called // because object is not present in GCS. ctx context.Context, op *fuseops.RmDirOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Find the parent. fs.mu.Lock() parent := fs.dirInodeOrDie(op.Parent) fs.mu.Unlock() // Find or create the child inode, locked. child, err := fs.lookUpOrCreateChildInode(ctx, parent, op.Name) if err != nil { return } // Set up a function that throws away the lookup count increment that we // implicitly did above (since we're not handing the child back to the // kernel) and unlocks the child, but only once. Ensure it is called at least // once in case we exit early. childCleanedUp := false cleanUpAndUnlockChild := func() { if !childCleanedUp { childCleanedUp = true fs.unlockAndDecrementLookupCount(child, 1) } } defer cleanUpAndUnlockChild() // Is the child a directory? childDir, ok := child.(inode.DirInode) if !ok { err = fuse.ENOTDIR return } // Ensure that the child directory is empty. // // Yes, this is not atomic with the delete below. See here for discussion: // // https://github.com/GoogleCloudPlatform/gcsfuse/issues/9 // // // Check for local file entries. fs.mu.Lock() localFileEntries := childDir.LocalFileEntries(fs.localFileInodes) fs.mu.Unlock() // Are there any local entries? if len(localFileEntries) != 0 { err = fuse.ENOTEMPTY return } // Check for entries on GCS. var tok string for { var entries []fuseutil.Dirent entries, tok, err = childDir.ReadEntries(ctx, tok) if err != nil { err = fmt.Errorf("ReadEntries: %w", err) return err } if fs.kernelListCacheTTL > 0 { // Clear kernel list cache after removing a directory. This ensures remote // GCS files are included in future directory listings for unlinking. childDir.InvalidateKernelListCache() } // Are there any entries? if len(entries) != 0 { err = fuse.ENOTEMPTY return } // Are we done listing? if tok == "" { break } } // We are done with the child. cleanUpAndUnlockChild() // Delete the backing object. fs.mu.Lock() _, isImplicitDir := fs.implicitDirInodes[child.Name()] fs.mu.Unlock() parent.Lock() err = parent.DeleteChildDir(ctx, op.Name, isImplicitDir, childDir) parent.Unlock() if err != nil { err = fmt.Errorf("DeleteChildDir: %w", err) return err } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) Rename( ctx context.Context, op *fuseops.RenameOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Find the old and new parents. fs.mu.Lock() oldParent := fs.dirInodeOrDie(op.OldParent) newParent := fs.dirInodeOrDie(op.NewParent) fs.mu.Unlock() if oldInode, ok := oldParent.(inode.BucketOwnedInode); !ok { // The old parent is not owned by any bucket, which means it's the base // directory that holds all the buckets' root directories. So, this op // is to rename a bucket, which is not supported. return fmt.Errorf("rename a bucket: %w", syscall.ENOTSUP) } else { // The target path must exist in the same bucket. oldBucket := oldInode.Bucket().Name() if newInode, ok := newParent.(inode.BucketOwnedInode); !ok || oldBucket != newInode.Bucket().Name() { return fmt.Errorf("move out of bucket %q: %w", oldBucket, syscall.ENOTSUP) } } child, err := fs.lookUpOrCreateChildInode(ctx, oldParent, op.OldName) if err != nil { return err } if child == nil { return fuse.ENOENT } child.DecrementLookupCount(1) child.Unlock() childBktOwned, ok := child.(inode.BucketOwnedInode) if !ok { // Won't happen in ideal case. return fmt.Errorf("child inode (id %v) is not owned by any bucket", child.ID()) } if child.Name().IsDir() { // If 'enable-hns' flag is false, the bucket type is set to 'NonHierarchical' even for HNS buckets because the control client is nil. // Therefore, an additional 'enable hns' check is not required here. if childBktOwned.Bucket().BucketType().Hierarchical { return fs.renameHierarchicalDir(ctx, oldParent, op.OldName, newParent, op.NewName) } return fs.renameNonHierarchicalDir(ctx, oldParent, op.OldName, newParent, op.NewName) } childFileInode, ok := child.(*inode.FileInode) if !ok { return fmt.Errorf("child inode (id %v) is neither file nor directory inode", child.ID()) } // TODO(b/402335988): Fix rename flow for local files when streaming writes is disabled. // If object to be renamed is a local file inode and streaming writes are disabled, rename operation is not supported. if childFileInode.IsLocal() && !fs.newConfig.Write.EnableStreamingWrites { return fmt.Errorf("cannot rename open file %q: %w", op.OldName, syscall.ENOTSUP) } return fs.renameFile(ctx, op, childFileInode, oldParent, newParent) } // LOCKS_EXCLUDED(oldParent) // LOCKS_EXCLUDED(newParent) func (fs *fileSystem) renameFile(ctx context.Context, op *fuseops.RenameOp, oldObject *inode.FileInode, oldParent, newParent inode.DirInode) error { updatedMinObject, err := fs.flushPendingWrites(ctx, oldObject) if err != nil { return fmt.Errorf("flushPendingWrites: %w", err) } if (oldObject.Bucket().BucketType().Hierarchical && fs.enableAtomicRenameObject) || oldObject.Bucket().BucketType().Zonal { return fs.renameHierarchicalFile(ctx, oldParent, op.OldName, updatedMinObject, newParent, op.NewName) } return fs.renameNonHierarchicalFile(ctx, oldParent, op.OldName, updatedMinObject, newParent, op.NewName) } // LOCKS_EXCLUDED(fileInode) func (fs *fileSystem) flushPendingWrites(ctx context.Context, fileInode *inode.FileInode) (minObject *gcs.MinObject, err error) { // We will return modified minObject if flush is done, otherwise the original // minObject is returned. Original minObject is the one passed in the request. fileInode.Lock() defer fileInode.Unlock() minObject = fileInode.Source() if !fs.newConfig.Write.EnableStreamingWrites { return } // Try to flush if there are any pending writes. err = fs.flushFile(ctx, fileInode) minObject = fileInode.Source() return } // LOCKS_EXCLUDED(oldParent) // LOCKS_EXCLUDED(newParent) func (fs *fileSystem) renameHierarchicalFile(ctx context.Context, oldParent inode.DirInode, oldName string, oldObject *gcs.MinObject, newParent inode.DirInode, newName string) error { oldParent.Lock() defer oldParent.Unlock() if newParent != oldParent { newParent.Lock() defer newParent.Unlock() } newFileName := inode.NewFileName(newParent.Name(), newName) if _, err := oldParent.RenameFile(ctx, oldObject, newFileName.GcsObjectName()); err != nil { return fmt.Errorf("renameFile: while renaming file: %w", err) } if err := fs.invalidateChildFileCacheIfExist(oldParent, oldName); err != nil { return fmt.Errorf("renameHierarchicalFile: while invalidating cache for delete file: %w", err) } // Insert new file in type cache. newParent.InsertFileIntoTypeCache(newName) return nil } // LOCKS_EXCLUDED(oldParent) // LOCKS_EXCLUDED(newParent) func (fs *fileSystem) renameNonHierarchicalFile( ctx context.Context, oldParent inode.DirInode, oldName string, oldObject *gcs.MinObject, newParent inode.DirInode, newFileName string) error { // Clone into the new location. newParent.Lock() _, err := newParent.CloneToChildFile(ctx, newFileName, oldObject) newParent.Unlock() if err != nil { err = fmt.Errorf("CloneToChildFile: %w", err) return err } // Delete behind. Make sure to delete exactly the generation we cloned, in // case the referent of the name has changed in the meantime. oldParent.Lock() err = oldParent.DeleteChildFile( ctx, oldName, oldObject.Generation, &oldObject.MetaGeneration) if err := fs.invalidateChildFileCacheIfExist(oldParent, oldObject.Name); err != nil { return fmt.Errorf("renameNonHierarchicalFile: while invalidating cache for delete file: %w", err) } oldParent.Unlock() if err != nil { err = fmt.Errorf("DeleteChildFile: %w", err) return err } return nil } func (fs *fileSystem) releaseInodes(inodes *[]inode.DirInode) { for _, in := range *inodes { fs.unlockAndDecrementLookupCount(in, 1) } *inodes = []inode.DirInode{} } func (fs *fileSystem) getBucketDirInode(ctx context.Context, parent inode.DirInode, name string) (inode.BucketOwnedDirInode, error) { dir, err := fs.lookUpOrCreateChildDirInode(ctx, parent, name) if err != nil { return nil, fmt.Errorf("lookup directory: %w", err) } return dir, nil } func (fs *fileSystem) ensureNoLocalFilesInDirectory(dir inode.BucketOwnedDirInode, name string) error { fs.mu.Lock() entries := dir.LocalFileEntries(fs.localFileInodes) fs.mu.Unlock() if len(entries) != 0 { return fmt.Errorf("can't rename directory %s with open files: %w", name, syscall.ENOTSUP) } return nil } func (fs *fileSystem) checkDirNotEmpty(dir inode.BucketOwnedDirInode, name string) error { unexpected, err := dir.ReadDescendants(context.Background(), 1) if err != nil { return fmt.Errorf("read descendants of the new directory %q: %w", name, err) } if len(unexpected) > 0 { return fuse.ENOTEMPTY } return nil } // Rename an old folder to a new folder in a hierarchical bucket. If the new folder already // exists and is non-empty, return ENOTEMPTY. If old folder have open files then return // ENOTSUP. // // LOCKS_EXCLUDED(fs.mu) // LOCKS_EXCLUDED(oldParent) // LOCKS_EXCLUDED(newParent) func (fs *fileSystem) renameHierarchicalDir(ctx context.Context, oldParent inode.DirInode, oldName string, newParent inode.DirInode, newName string) (err error) { // Set up a function that throws away the lookup count increment from // lookUpOrCreateChildInode (since the pending inodes are not sent back to // the kernel) and unlocks the pending inodes, but only once. var pendingInodes []inode.DirInode defer fs.releaseInodes(&pendingInodes) oldDirInode, err := fs.getBucketDirInode(ctx, oldParent, oldName) if err != nil { return err } pendingInodes = append(pendingInodes, oldDirInode) if err = fs.ensureNoLocalFilesInDirectory(oldDirInode, oldName); err != nil { return err } oldDirName := inode.NewDirName(oldParent.Name(), oldName) newDirName := inode.NewDirName(newParent.Name(), newName) // If the call for getBucketDirInode fails it means directory does not exist. newDirInode, err := fs.getBucketDirInode(ctx, newParent, newName) if err == nil { // If the directory exists, then check if it is empty or not. if err = fs.checkDirNotEmpty(newDirInode, newName); err != nil { return err } // This refers to an empty destination directory. // The RenameFolder API does not allow renaming to an existing empty directory. // To make this work, we delete the empty directory first from gcsfuse and then perform rename. newParent.Lock() _ = newParent.DeleteChildDir(ctx, newName, false, newDirInode) newParent.Unlock() pendingInodes = append(pendingInodes, newDirInode) } // Note:The renameDirLimit is not utilized in the folder rename operation because there is no user-defined limit on new renames. oldParent.Lock() defer oldParent.Unlock() if newParent != oldParent { newParent.Lock() defer newParent.Unlock() } // Rename old directory to the new directory, keeping both parent directories locked. _, err = oldParent.RenameFolder(ctx, oldDirName.GcsObjectName(), newDirName.GcsObjectName()) if err != nil { return fmt.Errorf("failed to rename folder: %w", err) } return } // Rename an old directory to a new directory in a non-hierarchical bucket. If the new directory already // exists and is non-empty, return ENOTEMPTY. // // LOCKS_EXCLUDED(fs.mu) // LOCKS_EXCLUDED(oldParent) // LOCKS_EXCLUDED(newParent) func (fs *fileSystem) renameNonHierarchicalDir( ctx context.Context, oldParent inode.DirInode, oldName string, newParent inode.DirInode, newName string) error { // Set up a function that throws away the lookup count increment from // lookUpOrCreateChildInode (since the pending inodes are not sent back to // the kernel) and unlocks the pending inodes, but only once var pendingInodes []inode.DirInode defer fs.releaseInodes(&pendingInodes) oldDir, err := fs.getBucketDirInode(ctx, oldParent, oldName) if err != nil { return err } pendingInodes = append(pendingInodes, oldDir) if err = fs.ensureNoLocalFilesInDirectory(oldDir, oldName); err != nil { return err } // Fetch all the descendants of the old directory recursively descendants, err := oldDir.ReadDescendants(ctx, int(fs.renameDirLimit+1)) if err != nil { return fmt.Errorf("read descendants of the old directory %q: %w", oldName, err) } if len(descendants) > int(fs.renameDirLimit) { return fmt.Errorf("too many objects to be renamed: %w", syscall.EMFILE) } // Create the backing object of the new directory. newParent.Lock() _, err = newParent.CreateChildDir(ctx, newName) newParent.Unlock() if err != nil { var preconditionErr *gcs.PreconditionError if errors.As(err, &preconditionErr) { // This means the new directory already exists, which is OK if // it is empty (checked below). } else { return fmt.Errorf("CreateChildDir: %w", err) } } newDir, err := fs.getBucketDirInode(ctx, newParent, newName) if err != nil { return err } pendingInodes = append(pendingInodes, newDir) if err = fs.checkDirNotEmpty(newDir, newName); err != nil { return err } // Move all the files from the old directory to the new directory, keeping both directories locked. for _, descendant := range descendants { nameDiff := strings.TrimPrefix(descendant.FullName.GcsObjectName(), oldDir.Name().GcsObjectName()) if nameDiff == descendant.FullName.GcsObjectName() { return fmt.Errorf("unwanted descendant %q not from dir %q", descendant.FullName, oldDir.Name()) } o := descendant.MinObject if _, err := newDir.CloneToChildFile(ctx, nameDiff, o); err != nil { return fmt.Errorf("copy file %q: %w", o.Name, err) } if err := oldDir.DeleteChildFile(ctx, nameDiff, o.Generation, &o.MetaGeneration); err != nil { return fmt.Errorf("delete file %q: %w", o.Name, err) } if err = fs.invalidateChildFileCacheIfExist(oldDir, o.Name); err != nil { return fmt.Errorf("unlink: while invalidating cache for delete file: %w", err) } } fs.releaseInodes(&pendingInodes) // Delete the backing object of the old directory. fs.mu.Lock() _, isImplicitDir := fs.implicitDirInodes[oldDir.Name()] fs.mu.Unlock() oldParent.Lock() err = oldParent.DeleteChildDir(ctx, oldName, isImplicitDir, oldDir) oldParent.Unlock() if err != nil { return fmt.Errorf("DeleteChildDir: %w", err) } return nil } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) Unlink( ctx context.Context, op *fuseops.UnlinkOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } fs.mu.Lock() // Find the parent and file name. parent := fs.dirInodeOrDie(op.Parent) fileName := inode.NewFileName(parent.Name(), op.Name) // Get the inode for the given file. // Files must have an associated inode, which can be found in either: // - localFileInodes: For files created locally. // - generationBackedInodes: For files backed by an object. // We are not checking implicitDirInodes or folderInodes because // the unlink operation is only applicable to files. in, isLocalFile := fs.localFileInodes[fileName] if !isLocalFile { in = fs.generationBackedInodes[fileName] } fs.mu.Unlock() if in != nil { // Perform the unlink operation on the inode. in.Lock() in.Unlink() in.Unlock() } // If the inode represents a local file, we don't need to delete // the backing object on GCS, so return early. if isLocalFile { return } // Delete the backing object present on GCS. parent.Lock() defer parent.Unlock() err = parent.DeleteChildFile( ctx, op.Name, 0, // Latest generation nil) // No meta-generation precondition if err != nil { err = fmt.Errorf("DeleteChildFile: %w", err) return err } if err := fs.invalidateChildFileCacheIfExist(parent, fileName.GcsObjectName()); err != nil { return fmt.Errorf("unlink: while invalidating cache for delete file: %w", err) } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) OpenDir( ctx context.Context, op *fuseops.OpenDirOp) (err error) { fs.mu.Lock() // Make sure the inode still exists and is a directory. If not, something has // screwed up because the VFS layer shouldn't have let us forget the inode // before opening it. in := fs.dirInodeOrDie(op.Inode) // Allocate a handle. handleID := fs.nextHandleID fs.nextHandleID++ fs.handles[handleID] = handle.NewDirHandle(in, fs.implicitDirs) op.Handle = handleID fs.mu.Unlock() // Enables kernel list-cache in case of non-zero kernelListCacheTTL. if fs.kernelListCacheTTL > 0 { // Invalidates the kernel list-cache once the last cached response is out of // kernelListCacheTTL. op.KeepCache = !in.ShouldInvalidateKernelListCache(fs.kernelListCacheTTL) op.CacheDir = true } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) ReadDir( ctx context.Context, op *fuseops.ReadDirOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Find the handle. fs.mu.Lock() dh := fs.handles[op.Handle].(*handle.DirHandle) in := fs.dirInodeOrDie(op.Inode) // Fetch local file entries beforehand and pass it to directory handle as // we need fs lock to fetch local file entries. localFileEntries := in.LocalFileEntries(fs.localFileInodes) fs.mu.Unlock() dh.Mu.Lock() defer dh.Mu.Unlock() // Serve the request. if err := dh.ReadDir(ctx, op, localFileEntries); err != nil { return err } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) ReleaseDirHandle( ctx context.Context, op *fuseops.ReleaseDirHandleOp) (err error) { fs.mu.Lock() defer fs.mu.Unlock() // Sanity check that this handle exists and is of the correct type. _ = fs.handles[op.Handle].(*handle.DirHandle) // Clear the entry from the map. delete(fs.handles, op.Handle) return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) OpenFile( ctx context.Context, op *fuseops.OpenFileOp) (err error) { fs.mu.Lock() // Find the inode. in := fs.fileInodeOrDie(op.Inode) // Follow lock ordering rules to get inode lock. // Inode lock is required to register fileHandle with the inode. fs.mu.Unlock() in.Lock() defer in.Unlock() // Get the fs lock again. fs.mu.Lock() defer fs.mu.Unlock() // Allocate a handle. handleID := fs.nextHandleID fs.nextHandleID++ fs.handles[handleID] = handle.NewFileHandle(in, fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, op.OpenFlags.IsReadOnly()) op.Handle = handleID // When we observe object generations that we didn't create, we assign them // new inode IDs. So for a given inode, all modifications go through the // kernel. Therefore it's safe to tell the kernel to keep the page cache from // open to open for a given inode. op.KeepPageCache = true return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) ReadFile( ctx context.Context, op *fuseops.ReadFileOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Save readOp in context for access in logs. ctx = context.WithValue(ctx, gcsx.ReadOp, op) // Find the handle and lock it. fs.mu.Lock() fh := fs.handles[op.Handle].(*handle.FileHandle) fs.mu.Unlock() fh.Lock() defer fh.Unlock() // Serve the read. op.Dst, op.BytesRead, err = fh.Read(ctx, op.Dst, op.Offset, fs.sequentialReadSizeMb) // As required by fuse, we don't treat EOF as an error. if err == io.EOF { err = nil } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) ReadSymlink( ctx context.Context, op *fuseops.ReadSymlinkOp) (err error) { // Find the inode. fs.mu.Lock() in := fs.symlinkInodeOrDie(op.Inode) fs.mu.Unlock() in.Lock() defer in.Unlock() // Serve the request. op.Target = in.Target() return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) WriteFile( ctx context.Context, op *fuseops.WriteFileOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Find the inode. fs.mu.Lock() in := fs.fileInodeOrDie(op.Inode) fs.mu.Unlock() in.Lock() defer in.Unlock() err = fs.initBufferedWriteHandlerAndSyncFileIfEligible(ctx, in) if err != nil { return } // Serve the request. err = in.Write(ctx, op.Data, op.Offset) return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) SyncFile( ctx context.Context, op *fuseops.SyncFileOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Find the inode. fs.mu.Lock() in := fs.inodeOrDie(op.Inode) fs.mu.Unlock() file, ok := in.(*inode.FileInode) if !ok { // No-op if the target is not a file return } file.Lock() defer file.Unlock() // Sync it. if err := fs.syncFile(ctx, file); err != nil { return err } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) FlushFile( ctx context.Context, op *fuseops.FlushFileOp) (err error) { if fs.newConfig.FileSystem.IgnoreInterrupts { // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. var cancel context.CancelFunc ctx, cancel = util.IsolateContextFromParentContext(ctx) defer cancel() } // Find the inode. fs.mu.Lock() in := fs.fileInodeOrDie(op.Inode) fs.mu.Unlock() in.Lock() defer in.Unlock() // Sync it. if err := fs.flushFile(ctx, in); err != nil { return err } return } // LOCKS_EXCLUDED(fs.mu) func (fs *fileSystem) ReleaseFileHandle( ctx context.Context, op *fuseops.ReleaseFileHandleOp) (err error) { fs.mu.Lock() fileHandle := fs.handles[op.Handle].(*handle.FileHandle) // Update the map. We are okay updating the map before destroy is called // since destroy is doing only internal cleanup. delete(fs.handles, op.Handle) fs.mu.Unlock() // Destroy the handle. fileHandle.Lock() defer fileHandle.Unlock() fileHandle.Destroy() return } func (fs *fileSystem) GetXattr( ctx context.Context, op *fuseops.GetXattrOp) (err error) { return syscall.ENOSYS } func (fs *fileSystem) ListXattr( ctx context.Context, op *fuseops.ListXattrOp) error { return syscall.ENOSYS } func (fs *fileSystem) SyncFS( ctx context.Context, op *fuseops.SyncFSOp) error { return syscall.ENOSYS }