in internal/fs/fs.go [2307:2397]
func (fs *fileSystem) renameNonHierarchicalDir(
ctx context.Context,
oldParent inode.DirInode,
oldName string,
newParent inode.DirInode,
newName string) error {
// Set up a function that throws away the lookup count increment from
// lookUpOrCreateChildInode (since the pending inodes are not sent back to
// the kernel) and unlocks the pending inodes, but only once
var pendingInodes []inode.DirInode
defer fs.releaseInodes(&pendingInodes)
oldDir, err := fs.getBucketDirInode(ctx, oldParent, oldName)
if err != nil {
return err
}
pendingInodes = append(pendingInodes, oldDir)
if err = fs.ensureNoLocalFilesInDirectory(oldDir, oldName); err != nil {
return err
}
// Fetch all the descendants of the old directory recursively
descendants, err := oldDir.ReadDescendants(ctx, int(fs.renameDirLimit+1))
if err != nil {
return fmt.Errorf("read descendants of the old directory %q: %w", oldName, err)
}
if len(descendants) > int(fs.renameDirLimit) {
return fmt.Errorf("too many objects to be renamed: %w", syscall.EMFILE)
}
// Create the backing object of the new directory.
newParent.Lock()
_, err = newParent.CreateChildDir(ctx, newName)
newParent.Unlock()
if err != nil {
var preconditionErr *gcs.PreconditionError
if errors.As(err, &preconditionErr) {
// This means the new directory already exists, which is OK if
// it is empty (checked below).
} else {
return fmt.Errorf("CreateChildDir: %w", err)
}
}
newDir, err := fs.getBucketDirInode(ctx, newParent, newName)
if err != nil {
return err
}
pendingInodes = append(pendingInodes, newDir)
if err = fs.checkDirNotEmpty(newDir, newName); err != nil {
return err
}
// Move all the files from the old directory to the new directory, keeping both directories locked.
for _, descendant := range descendants {
nameDiff := strings.TrimPrefix(descendant.FullName.GcsObjectName(), oldDir.Name().GcsObjectName())
if nameDiff == descendant.FullName.GcsObjectName() {
return fmt.Errorf("unwanted descendant %q not from dir %q", descendant.FullName, oldDir.Name())
}
o := descendant.MinObject
if _, err := newDir.CloneToChildFile(ctx, nameDiff, o); err != nil {
return fmt.Errorf("copy file %q: %w", o.Name, err)
}
if err := oldDir.DeleteChildFile(ctx, nameDiff, o.Generation, &o.MetaGeneration); err != nil {
return fmt.Errorf("delete file %q: %w", o.Name, err)
}
if err = fs.invalidateChildFileCacheIfExist(oldDir, o.Name); err != nil {
return fmt.Errorf("unlink: while invalidating cache for delete file: %w", err)
}
}
fs.releaseInodes(&pendingInodes)
// Delete the backing object of the old directory.
fs.mu.Lock()
_, isImplicitDir := fs.implicitDirInodes[oldDir.Name()]
fs.mu.Unlock()
oldParent.Lock()
err = oldParent.DeleteChildDir(ctx, oldName, isImplicitDir, oldDir)
oldParent.Unlock()
if err != nil {
return fmt.Errorf("DeleteChildDir: %w", err)
}
return nil
}