internal/fs/inode/dir.go (689 lines of code) (raw):

// Copyright 2015 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package inode import ( "errors" "fmt" "path" "strings" "time" "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/metadata" "github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx" "github.com/googlecloudplatform/gcsfuse/v2/internal/locker" "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil" "github.com/jacobsa/fuse/fuseops" "github.com/jacobsa/fuse/fuseutil" "github.com/jacobsa/timeutil" "golang.org/x/net/context" "golang.org/x/sync/errgroup" ) // ListObjects call supports fetching upto 5000 results when projection is noAcl // via maxResults param in one call. When projection is set to full, it returns // 2000 results max. In GcsFuse flows we will be setting projection as noAcl. // By default 1000 results are returned if maxResults is not set. // Defining a constant to set maxResults param. const MaxResultsForListObjectsCall = 5000 // An inode representing a directory, with facilities for listing entries, // looking up children, and creating and deleting children. Must be locked for // any method additional to the Inode interface. type DirInode interface { Inode // Look up the direct child with the given relative name, returning // information about the object backing the child or whether it exists as an // implicit directory. If a file/symlink and a directory with the given name // both exist, the directory is preferred. Return nil result and a nil error // if neither is found. // // Special case: if the name ends in ConflictingFileNameSuffix, we strip the // suffix, confirm that a conflicting directory exists, then return a result // for the file/symlink. // // If this inode was created with implicitDirs is set, this method will use // ListObjects to find child directories that are "implicitly" defined by the // existence of their own descendents. For example, if there is an object // named "foo/bar/baz" and this is the directory "foo", a child directory // named "bar" will be implied. In this case, result.ImplicitDir will be // true. LookUpChild(ctx context.Context, name string) (*Core, error) // Rename the file. RenameFile(ctx context.Context, fileToRename *gcs.MinObject, destinationFileName string) (*gcs.Object, error) // Rename the directiory/folder. RenameFolder(ctx context.Context, folderName string, destinationFolderId string) (*gcs.Folder, error) // Read the children objects of this dir, recursively. The result count // is capped at the given limit. Internal caches are not refreshed from this // call. ReadDescendants(ctx context.Context, limit int) (map[Name]*Core, error) // Read some number of entries from the directory, returning a continuation // token that can be used to pick up the read operation where it left off. // Supply the empty token on the first call. // // At the end of the directory, the returned continuation token will be // empty. Otherwise it will be non-empty. There is no guarantee about the // number of entries returned; it may be zero even with a non-empty // continuation token. // // The contents of the Offset and Inode fields for returned entries is // undefined. ReadEntries( ctx context.Context, tok string) (entries []fuseutil.Dirent, newTok string, err error) // Create an empty child file with the supplied (relative) name, failing with // *gcs.PreconditionError if a backing object already exists in GCS. // Return the full name of the child and the GCS object it backs up. CreateChildFile(ctx context.Context, name string) (*Core, error) // CreateLocalChildFileCore returns an empty local child file core. CreateLocalChildFileCore(name string) (Core, error) // InsertFileIntoTypeCache adds the given name to type-cache InsertFileIntoTypeCache(name string) // EraseFromTypeCache removes the given name from type-cache EraseFromTypeCache(name string) // Like CreateChildFile, except clone the supplied source object instead of // creating an empty object. // Return the full name of the child and the GCS object it backs up. CloneToChildFile(ctx context.Context, name string, src *gcs.MinObject) (*Core, error) // Create a symlink object with the supplied (relative) name and the supplied // target, failing with *gcs.PreconditionError if a backing object already // exists in GCS. // Return the full name of the child and the GCS object it backs up. CreateChildSymlink(ctx context.Context, name string, target string) (*Core, error) // Create a backing object for a child directory with the supplied (relative) // name, failing with *gcs.PreconditionError if a backing object already // exists in GCS. // Return the full name of the child and the GCS object it backs up. CreateChildDir(ctx context.Context, name string) (*Core, error) // Delete the backing object for the child file or symlink with the given // (relative) name and generation number, where zero means the latest // generation. If the object/generation doesn't exist, no error is returned. // // metaGeneration may be set to a non-nil pointer giving a meta-generation // precondition, but need not be. DeleteChildFile( ctx context.Context, name string, generation int64, metaGeneration *int64) (err error) // Delete the backing object for the child directory with the given // (relative) name if it is not an Implicit Directory. DeleteChildDir( ctx context.Context, name string, isImplicitDir bool, dirInode DirInode) (err error) // LocalFileEntries lists the local files present in the directory. // Local means that the file is not yet present on GCS. LocalFileEntries(localFileInodes map[Name]Inode) (localEntries map[string]fuseutil.Dirent) // LockForChildLookup takes appropriate kind of lock when an inode's child is // looked up. LockForChildLookup() // UnlockForChildLookup unlocks the lock taken with LockForChildLookup. UnlockForChildLookup() // ShouldInvalidateKernelListCache tells the filesystem whether kernel list-cache // should be invalidated or not. ShouldInvalidateKernelListCache(ttl time.Duration) bool // InvalidateKernelListCache guarantees that the subsequent list call will be // served from GCSFuse. InvalidateKernelListCache() // RLock readonly lock. RLock() // RUnlock readonly unlock. RUnlock() IsUnlinked() bool Unlink() } // An inode that represents a directory from a GCS bucket. type BucketOwnedDirInode interface { DirInode BucketOwnedInode } type dirInode struct { ///////////////////////// // Dependencies ///////////////////////// bucket *gcsx.SyncerBucket mtimeClock timeutil.Clock cacheClock timeutil.Clock ///////////////////////// // Constant data ///////////////////////// id fuseops.InodeID implicitDirs bool includeFoldersAsPrefixes bool enableNonexistentTypeCache bool // INVARIANT: name.IsDir() name Name attrs fuseops.InodeAttributes ///////////////////////// // Mutable state ///////////////////////// // A RW mutex that must be held when calling certain methods. See // documentation for each method. mu locker.RWLocker // GUARDED_BY(mu) lc lookupCount // cache.CheckInvariants() does not panic. // // GUARDED_BY(mu) cache metadata.TypeCache // prevDirListingTimeStamp is the time stamp of previous listing when user asked // (via kernel) the directory listing from the filesystem. // Specially used when kernelListCacheTTL > 0 that means kernel list-cache is // enabled. prevDirListingTimeStamp time.Time isHNSEnabled bool // Represents if folder has been unlinked in hierarchical bucket. This is not getting used in // non-hierarchical bucket. unlinked bool } var _ DirInode = &dirInode{} // Create a directory inode for the name, representing the directory containing // the objects for which it is an immediate prefix. For the root directory, // this is the empty string. // // If implicitDirs is set, LookUpChild will use ListObjects to find child // directories that are "implicitly" defined by the existence of their own // descendents. For example, if there is an object named "foo/bar/baz" and this // is the directory "foo", a child directory named "bar" will be implied. // // If typeCacheTTL is non-zero, a cache from child name to information about // whether that name exists as a file/symlink and/or directory will be // maintained. This may speed up calls to LookUpChild, especially when combined // with a stat-caching GCS bucket, but comes at the cost of consistency: if the // child is removed and recreated with a different type before the expiration, // we may fail to find it. // // The initial lookup count is zero. // // REQUIRES: name.IsDir() func NewDirInode( id fuseops.InodeID, name Name, attrs fuseops.InodeAttributes, implicitDirs bool, includeFoldersAsPrefixes bool, enableNonexistentTypeCache bool, typeCacheTTL time.Duration, bucket *gcsx.SyncerBucket, mtimeClock timeutil.Clock, cacheClock timeutil.Clock, typeCacheMaxSizeMB int64, isHNSEnabled bool, ) (d DirInode) { if !name.IsDir() { panic(fmt.Sprintf("Unexpected name: %s", name)) } typed := &dirInode{ bucket: bucket, mtimeClock: mtimeClock, cacheClock: cacheClock, id: id, implicitDirs: implicitDirs, includeFoldersAsPrefixes: includeFoldersAsPrefixes, enableNonexistentTypeCache: enableNonexistentTypeCache, name: name, attrs: attrs, cache: metadata.NewTypeCache(typeCacheMaxSizeMB, typeCacheTTL), isHNSEnabled: isHNSEnabled, unlinked: false, } typed.lc.Init(id) // Set up invariant checking. typed.mu = locker.NewRW(name.GcsObjectName(), typed.checkInvariants) d = typed return } //////////////////////////////////////////////////////////////////////// // Helpers //////////////////////////////////////////////////////////////////////// func (d *dirInode) checkInvariants() { // INVARIANT: d.name.IsDir() if !d.name.IsDir() { panic(fmt.Sprintf("Unexpected name: %s", d.name)) } } func (d *dirInode) lookUpChildFile(ctx context.Context, name string) (*Core, error) { return findExplicitInode(ctx, d.Bucket(), NewFileName(d.Name(), name)) } func (d *dirInode) lookUpChildDir(ctx context.Context, name string) (*Core, error) { childName := NewDirName(d.Name(), name) if d.isBucketHierarchical() { return findExplicitFolder(ctx, d.Bucket(), childName) } if d.implicitDirs { return findDirInode(ctx, d.Bucket(), childName) } return findExplicitInode(ctx, d.Bucket(), childName) } // Look up the file for a (file, dir) pair with conflicting names, overriding // the default behavior. If the file doesn't exist, return a nil record with a // nil error. If the directory doesn't exist, pretend the file doesn't exist. // // REQUIRES: strings.HasSuffix(name, ConflictingFileNameSuffix) func (d *dirInode) lookUpConflicting(ctx context.Context, name string) (*Core, error) { strippedName := strings.TrimSuffix(name, ConflictingFileNameSuffix) // In order to a marked name to be accepted, we require the conflicting // directory to exist. result, err := d.lookUpChildDir(ctx, strippedName) if err != nil { return nil, fmt.Errorf("lookUpChildDir for stripped name: %w", err) } if result == nil { return nil, nil } // The directory name exists. Find the conflicting file. // Overwrite the result. result, err = d.lookUpChildFile(ctx, strippedName) if err != nil { return nil, fmt.Errorf("lookUpChildFile for stripped name: %w", err) } return result, nil } // findExplicitInode finds the file or dir inode core backed by an explicit // object in GCS with the given name. Return nil if such object does not exist. func findExplicitInode(ctx context.Context, bucket *gcsx.SyncerBucket, name Name) (*Core, error) { // Call the bucket. req := &gcs.StatObjectRequest{ Name: name.GcsObjectName(), } m, _, err := bucket.StatObject(ctx, req) // Suppress "not found" errors. var gcsErr *gcs.NotFoundError if errors.As(err, &gcsErr) { return nil, nil } // Annotate others. if err != nil { return nil, fmt.Errorf("StatObject: %w", err) } return &Core{ Bucket: bucket, FullName: name, MinObject: m, }, nil } func findExplicitFolder(ctx context.Context, bucket *gcsx.SyncerBucket, name Name) (*Core, error) { folder, err := bucket.GetFolder(ctx, name.GcsObjectName()) // Suppress "not found" errors. var gcsErr *gcs.NotFoundError if errors.As(err, &gcsErr) { return nil, nil } // Annotate others. if folder == nil { return nil, fmt.Errorf("error in get folder for lookup : %w", err) } return &Core{ Bucket: bucket, FullName: name, Folder: folder, }, nil } // findDirInode finds the dir inode core where the directory is either explicit // or implicit. Returns nil if no such directory exists. func findDirInode(ctx context.Context, bucket *gcsx.SyncerBucket, name Name) (*Core, error) { if !name.IsDir() { return nil, fmt.Errorf("%q is not directory", name) } req := &gcs.ListObjectsRequest{ Prefix: name.GcsObjectName(), MaxResults: 1, } listing, err := bucket.ListObjects(ctx, req) if err != nil { return nil, fmt.Errorf("list objects: %w", err) } if len(listing.MinObjects) == 0 { return nil, nil } result := &Core{ Bucket: bucket, FullName: name, } if o := listing.MinObjects[0]; o.Name == name.GcsObjectName() { result.MinObject = o } return result, nil } // Fail if the name already exists. Pass on errors directly. func (d *dirInode) createNewObject( ctx context.Context, name Name, metadata map[string]string) (o *gcs.Object, err error) { // Create an empty backing object for the child, failing if it already // exists. var precond int64 createReq := &gcs.CreateObjectRequest{ Name: name.GcsObjectName(), Contents: strings.NewReader(""), GenerationPrecondition: &precond, Metadata: metadata, } o, err = d.bucket.CreateObject(ctx, createReq) if err != nil { return } return } //////////////////////////////////////////////////////////////////////// // Public interface //////////////////////////////////////////////////////////////////////// func (d *dirInode) Lock() { d.mu.Lock() } func (d *dirInode) Unlock() { d.mu.Unlock() } func (d *dirInode) RLock() { d.mu.RLock() } func (d *dirInode) RUnlock() { d.mu.RUnlock() } // LockForChildLookup takes read-only lock on inode when the inode's child is // looked up. It is safe to take read-only lock to allow parallel lookups of // children because (a) during lookup, GCS is only read (list/stat), so as long // as GCS is not changed remotely, lookup will be consistent (b) all the other // directory level operations (read or write type) take exclusive locks. func (d *dirInode) LockForChildLookup() { d.mu.RLock() } func (d *dirInode) UnlockForChildLookup() { d.mu.RUnlock() } func (d *dirInode) ID() fuseops.InodeID { return d.id } func (d *dirInode) Name() Name { return d.name } // LOCKS_REQUIRED(d) func (d *dirInode) IncrementLookupCount() { d.lc.Inc() } // LOCKS_REQUIRED(d) func (d *dirInode) DecrementLookupCount(n uint64) (destroy bool) { destroy = d.lc.Dec(n) return } // LOCKS_REQUIRED(d) func (d *dirInode) Destroy() (err error) { // Nothing interesting to do. return } // LOCKS_REQUIRED(d) func (d *dirInode) Attributes( ctx context.Context) (attrs fuseops.InodeAttributes, err error) { // Set up basic attributes. attrs = d.attrs attrs.Nlink = 1 return } func (d *dirInode) Bucket() *gcsx.SyncerBucket { return d.bucket } // A suffix that can be used to unambiguously tag a file system name. // (Unambiguous because U+000A is not allowed in GCS object names.) This is // used to refer to the file/symlink in a (file/symlink, directory) pair with // conflicting object names. // // See also the notes on DirInode.LookUpChild. const ConflictingFileNameSuffix = "\n" // LOCKS_REQUIRED(d) func (d *dirInode) LookUpChild(ctx context.Context, name string) (*Core, error) { // Is this a conflict marker name? if strings.HasSuffix(name, ConflictingFileNameSuffix) { return d.lookUpConflicting(ctx, name) } group, ctx := errgroup.WithContext(ctx) var fileResult *Core var dirResult *Core lookUpFile := func() (err error) { fileResult, err = findExplicitInode(ctx, d.Bucket(), NewFileName(d.Name(), name)) return } lookUpExplicitDir := func() (err error) { dirResult, err = findExplicitInode(ctx, d.Bucket(), NewDirName(d.Name(), name)) return } lookUpImplicitOrExplicitDir := func() (err error) { dirResult, err = findDirInode(ctx, d.Bucket(), NewDirName(d.Name(), name)) return } lookUpHNSDir := func() (err error) { dirResult, err = findExplicitFolder(ctx, d.Bucket(), NewDirName(d.Name(), name)) return } cachedType := d.cache.Get(d.cacheClock.Now(), name) switch cachedType { case metadata.ImplicitDirType: dirResult = &Core{ Bucket: d.Bucket(), FullName: NewDirName(d.Name(), name), MinObject: nil, } case metadata.ExplicitDirType: if d.isBucketHierarchical() { group.Go(lookUpHNSDir) } else { group.Go(lookUpExplicitDir) } case metadata.RegularFileType, metadata.SymlinkType: group.Go(lookUpFile) case metadata.NonexistentType: return nil, nil case metadata.UnknownType: group.Go(lookUpFile) if d.isBucketHierarchical() { group.Go(lookUpHNSDir) } else { if d.implicitDirs { group.Go(lookUpImplicitOrExplicitDir) } else { group.Go(lookUpExplicitDir) } } } if err := group.Wait(); err != nil { return nil, err } var result *Core if dirResult != nil { result = dirResult } else if fileResult != nil { result = fileResult } if result != nil { d.cache.Insert(d.cacheClock.Now(), name, result.Type()) } else if d.enableNonexistentTypeCache && cachedType == metadata.UnknownType { d.cache.Insert(d.cacheClock.Now(), name, metadata.NonexistentType) } return result, nil } func (d *dirInode) IsUnlinked() bool { return d.unlinked } func (d *dirInode) Unlink() { d.unlinked = true } // LOCKS_REQUIRED(d) func (d *dirInode) ReadDescendants(ctx context.Context, limit int) (map[Name]*Core, error) { var tok string descendants := make(map[Name]*Core) for { listing, err := d.bucket.ListObjects(ctx, &gcs.ListObjectsRequest{ Delimiter: "", // recursively Prefix: d.Name().GcsObjectName(), ContinuationToken: tok, MaxResults: limit + 1, // to exclude itself }) if err != nil { return nil, fmt.Errorf("list objects: %w", err) } for _, o := range listing.MinObjects { if len(descendants) >= limit { return descendants, nil } // skip the current directory if o.Name == d.Name().GcsObjectName() { continue } name := NewDescendantName(d.Name(), o.Name) descendants[name] = &Core{ Bucket: d.Bucket(), FullName: name, MinObject: o, } } // Are we done listing? if tok = listing.ContinuationToken; tok == "" { return descendants, nil } } } // LOCKS_REQUIRED(d) func (d *dirInode) readObjects( ctx context.Context, tok string) (cores map[Name]*Core, newTok string, err error) { if d.isBucketHierarchical() { d.includeFoldersAsPrefixes = true } // Ask the bucket to list some objects. req := &gcs.ListObjectsRequest{ Delimiter: "/", IncludeTrailingDelimiter: true, Prefix: d.Name().GcsObjectName(), ContinuationToken: tok, MaxResults: MaxResultsForListObjectsCall, // Setting Projection param to noAcl since fetching owner and acls are not // required. ProjectionVal: gcs.NoAcl, IncludeFoldersAsPrefixes: d.includeFoldersAsPrefixes, } listing, err := d.bucket.ListObjects(ctx, req) if err != nil { err = fmt.Errorf("ListObjects: %w", err) return } cores = make(map[Name]*Core) defer func() { now := d.cacheClock.Now() for fullName, c := range cores { d.cache.Insert(now, path.Base(fullName.LocalName()), c.Type()) } }() for _, o := range listing.MinObjects { // Skip empty results or the directory object backing this inode. if o.Name == d.Name().GcsObjectName() || o.Name == "" { continue } nameBase := path.Base(o.Name) // ie. "bar" from "foo/bar/" or "foo/bar" // Given the alphabetical order of the objects, if a file "foo" and // directory "foo/" coexist, the directory would eventually occupy // the value of records["foo"]. if strings.HasSuffix(o.Name, "/") { // In a hierarchical bucket, create a folder entry instead of a minObject for each prefix. // This is because in a hierarchical bucket, every directory is considered a folder. // Adding folder entries while looping to through CollapsedRuns instead of here to avoid duplicate entries. if !d.isBucketHierarchical() { dirName := NewDirName(d.Name(), nameBase) explicitDir := &Core{ Bucket: d.Bucket(), FullName: dirName, MinObject: o, } cores[dirName] = explicitDir } } else { fileName := NewFileName(d.Name(), nameBase) file := &Core{ Bucket: d.Bucket(), FullName: fileName, MinObject: o, } cores[fileName] = file } } // Return an appropriate continuation token, if any. newTok = listing.ContinuationToken if !d.implicitDirs && !d.isBucketHierarchical() { return } // Add implicit directories into the result. unsupportedPrefixes := []string{} for _, p := range listing.CollapsedRuns { pathBase := path.Base(p) if storageutil.IsUnsupportedObjectName(p) { unsupportedPrefixes = append(unsupportedPrefixes, p) } dirName := NewDirName(d.Name(), pathBase) if d.isBucketHierarchical() { folder := gcs.Folder{Name: dirName.objectName} folderCore := &Core{ Bucket: d.Bucket(), FullName: dirName, Folder: &folder, } cores[dirName] = folderCore } else { if c, ok := cores[dirName]; ok && c.Type() == metadata.ExplicitDirType { continue } implicitDir := &Core{ Bucket: d.Bucket(), FullName: dirName, MinObject: nil, } cores[dirName] = implicitDir } } if len(unsupportedPrefixes) > 0 { logger.Errorf("Encountered unsupported prefixes during listing: %v", unsupportedPrefixes) } return } func (d *dirInode) ReadEntries( ctx context.Context, tok string) (entries []fuseutil.Dirent, newTok string, err error) { var cores map[Name]*Core cores, newTok, err = d.readObjects(ctx, tok) if err != nil { err = fmt.Errorf("read objects: %w", err) return } for fullName, core := range cores { entry := fuseutil.Dirent{ Name: path.Base(fullName.LocalName()), Type: fuseutil.DT_Unknown, } switch core.Type() { case metadata.SymlinkType: entry.Type = fuseutil.DT_Link case metadata.RegularFileType: entry.Type = fuseutil.DT_File case metadata.ImplicitDirType, metadata.ExplicitDirType: entry.Type = fuseutil.DT_Directory } entries = append(entries, entry) } d.prevDirListingTimeStamp = d.cacheClock.Now() return } // LOCKS_REQUIRED(d) func (d *dirInode) CreateChildFile(ctx context.Context, name string) (*Core, error) { childMetadata := map[string]string{ FileMtimeMetadataKey: d.mtimeClock.Now().UTC().Format(time.RFC3339Nano), } fullName := NewFileName(d.Name(), name) o, err := d.createNewObject(ctx, fullName, childMetadata) if err != nil { return nil, err } m := storageutil.ConvertObjToMinObject(o) d.cache.Insert(d.cacheClock.Now(), name, metadata.RegularFileType) return &Core{ Bucket: d.Bucket(), FullName: fullName, MinObject: m, }, nil } func (d *dirInode) CreateLocalChildFileCore(name string) (Core, error) { return Core{ Bucket: d.Bucket(), FullName: NewFileName(d.Name(), name), MinObject: nil, Local: true, }, nil } // LOCKS_REQUIRED(d) func (d *dirInode) InsertFileIntoTypeCache(name string) { d.cache.Insert(d.cacheClock.Now(), name, metadata.RegularFileType) } // LOCKS_REQUIRED(d) func (d *dirInode) EraseFromTypeCache(name string) { d.cache.Erase(name) } // LOCKS_REQUIRED(d) func (d *dirInode) CloneToChildFile(ctx context.Context, name string, src *gcs.MinObject) (*Core, error) { // Erase any existing type information for this name. d.cache.Erase(name) fullName := NewFileName(d.Name(), name) // Clone over anything that might already exist for the name. o, err := d.bucket.CopyObject( ctx, &gcs.CopyObjectRequest{ SrcName: src.Name, SrcGeneration: src.Generation, SrcMetaGenerationPrecondition: &src.MetaGeneration, DstName: fullName.GcsObjectName(), }) if err != nil { return nil, err } m := storageutil.ConvertObjToMinObject(o) c := &Core{ Bucket: d.Bucket(), FullName: fullName, MinObject: m, } d.cache.Insert(d.cacheClock.Now(), name, c.Type()) return c, nil } // LOCKS_REQUIRED(d) func (d *dirInode) CreateChildSymlink(ctx context.Context, name string, target string) (*Core, error) { fullName := NewFileName(d.Name(), name) childMetadata := map[string]string{ SymlinkMetadataKey: target, } o, err := d.createNewObject(ctx, fullName, childMetadata) if err != nil { return nil, err } m := storageutil.ConvertObjToMinObject(o) d.cache.Insert(d.cacheClock.Now(), name, metadata.SymlinkType) return &Core{ Bucket: d.Bucket(), FullName: fullName, MinObject: m, }, nil } // LOCKS_REQUIRED(d) func (d *dirInode) CreateChildDir(ctx context.Context, name string) (*Core, error) { // Generate the full name for the new directory. fullName := NewDirName(d.Name(), name) var m *gcs.MinObject var f *gcs.Folder var err error // Check the bucket type. if d.isBucketHierarchical() { // For hierarchical buckets, create a folder. f, err = d.bucket.CreateFolder(ctx, fullName.objectName) if err != nil { return nil, err } } else { var o *gcs.Object // For non-hierarchical buckets, create a new object. o, err = d.createNewObject(ctx, fullName, nil) if err != nil { return nil, err } // Convert the object to a minimal object. m = storageutil.ConvertObjToMinObject(o) } // Insert the new directory into the type cache. d.cache.Insert(d.cacheClock.Now(), name, metadata.ExplicitDirType) return &Core{ Bucket: d.Bucket(), FullName: fullName, MinObject: m, Folder: f, }, nil } // LOCKS_REQUIRED(d) func (d *dirInode) DeleteChildFile( ctx context.Context, name string, generation int64, metaGeneration *int64) (err error) { d.cache.Erase(name) childName := NewFileName(d.Name(), name) err = d.bucket.DeleteObject( ctx, &gcs.DeleteObjectRequest{ Name: childName.GcsObjectName(), Generation: generation, MetaGenerationPrecondition: metaGeneration, }) if err != nil { err = fmt.Errorf("DeleteObject: %w", err) return } d.cache.Erase(name) return } // LOCKS_REQUIRED(d) func (d *dirInode) DeleteChildDir( ctx context.Context, name string, isImplicitDir bool, dirInode DirInode) error { d.cache.Erase(name) // If the directory is an implicit directory, then no backing object // exists in the gcs bucket, so returning from here. // Hierarchical buckets don't have implicit dirs so this will be always false in hierarchical bucket case. if isImplicitDir { return nil } childName := NewDirName(d.Name(), name) // Delete the backing object. Unfortunately we have no way to precondition // this on the directory being empty. err := d.bucket.DeleteObject( ctx, &gcs.DeleteObjectRequest{ Name: childName.GcsObjectName(), Generation: 0, // Delete the latest version of object named after dir. }) if !d.isBucketHierarchical() { if err != nil { return fmt.Errorf("DeleteObject: %w", err) } d.cache.Erase(name) return nil } // Ignoring delete object error here, as in case of hns there is no way of knowing // if underlying placeholder object exists or not in Hierarchical bucket. // The DeleteFolder operation handles removing empty folders. if err = d.bucket.DeleteFolder(ctx, childName.GcsObjectName()); err != nil { return fmt.Errorf("DeleteFolder: %w", err) } if d.isBucketHierarchical() { dirInode.Unlink() } d.cache.Erase(name) return nil } // LOCKS_REQUIRED(fs) func (d *dirInode) LocalFileEntries(localFileInodes map[Name]Inode) (localEntries map[string]fuseutil.Dirent) { localEntries = make(map[string]fuseutil.Dirent) for localInodeName, in := range localFileInodes { // It is possible that the local file inode has been unlinked, but // still present in localFileInodes map because of open file handle. // So, if the inode has been unlinked, skip the entry. file, ok := in.(*FileInode) if ok && file.IsUnlinked() { continue } if localInodeName.IsDirectChildOf(d.Name()) { entry := fuseutil.Dirent{ Name: path.Base(localInodeName.LocalName()), Type: fuseutil.DT_File, } localEntries[entry.Name] = entry } } return } // ShouldInvalidateKernelListCache doesn't require any lock as d.prevDirListingTimeStamp // is concurrency safe, and we are okay with the in-consistent value. func (d *dirInode) ShouldInvalidateKernelListCache(ttl time.Duration) bool { // prevDirListingTimeStamp.IsZero() true means listing has not happened yet, and we should // invalidate for clean start. if d.prevDirListingTimeStamp.IsZero() { return true } cachedDuration := d.cacheClock.Now().Sub(d.prevDirListingTimeStamp) return cachedDuration >= ttl } // LOCKS_REQUIRED(d) // LOCKS_REQUIRED(parent of destinationFileName) func (d *dirInode) RenameFile(ctx context.Context, fileToRename *gcs.MinObject, destinationFileName string) (*gcs.Object, error) { req := &gcs.MoveObjectRequest{ SrcName: fileToRename.Name, DstName: destinationFileName, SrcGeneration: fileToRename.Generation, SrcMetaGenerationPrecondition: &fileToRename.MetaGeneration, } o, err := d.bucket.MoveObject(ctx, req) // Invalidate the cache entry for the old object name. d.cache.Erase(fileToRename.Name) return o, err } func (d *dirInode) RenameFolder(ctx context.Context, folderName string, destinationFolderName string) (*gcs.Folder, error) { folder, err := d.bucket.RenameFolder(ctx, folderName, destinationFolderName) if err != nil { return nil, err } // TODO: Cache updates won't be necessary once type cache usage is removed from HNS. // Remove old entry from type cache. d.cache.Erase(folderName) return folder, nil } func (d *dirInode) InvalidateKernelListCache() { // Set prevDirListingTimeStamp to Zero time so that cache is invalidated. d.prevDirListingTimeStamp = time.Time{} } func (d *dirInode) isBucketHierarchical() bool { if d.isHNSEnabled && d.bucket.BucketType().Hierarchical { return true } return false }