func()

in internal/fs/fs.go [2307:2397]


func (fs *fileSystem) renameNonHierarchicalDir(
	ctx context.Context,
	oldParent inode.DirInode,
	oldName string,
	newParent inode.DirInode,
	newName string) error {

	// Set up a function that throws away the lookup count increment from
	// lookUpOrCreateChildInode (since the pending inodes are not sent back to
	// the kernel) and unlocks the pending inodes, but only once
	var pendingInodes []inode.DirInode
	defer fs.releaseInodes(&pendingInodes)

	oldDir, err := fs.getBucketDirInode(ctx, oldParent, oldName)
	if err != nil {
		return err
	}
	pendingInodes = append(pendingInodes, oldDir)

	if err = fs.ensureNoLocalFilesInDirectory(oldDir, oldName); err != nil {
		return err
	}

	// Fetch all the descendants of the old directory recursively
	descendants, err := oldDir.ReadDescendants(ctx, int(fs.renameDirLimit+1))
	if err != nil {
		return fmt.Errorf("read descendants of the old directory %q: %w", oldName, err)
	}
	if len(descendants) > int(fs.renameDirLimit) {
		return fmt.Errorf("too many objects to be renamed: %w", syscall.EMFILE)
	}

	// Create the backing object of the new directory.
	newParent.Lock()
	_, err = newParent.CreateChildDir(ctx, newName)
	newParent.Unlock()
	if err != nil {
		var preconditionErr *gcs.PreconditionError
		if errors.As(err, &preconditionErr) {
			// This means the new directory already exists, which is OK if
			// it is empty (checked below).
		} else {
			return fmt.Errorf("CreateChildDir: %w", err)
		}
	}

	newDir, err := fs.getBucketDirInode(ctx, newParent, newName)
	if err != nil {
		return err
	}
	pendingInodes = append(pendingInodes, newDir)

	if err = fs.checkDirNotEmpty(newDir, newName); err != nil {
		return err
	}

	// Move all the files from the old directory to the new directory, keeping both directories locked.
	for _, descendant := range descendants {
		nameDiff := strings.TrimPrefix(descendant.FullName.GcsObjectName(), oldDir.Name().GcsObjectName())
		if nameDiff == descendant.FullName.GcsObjectName() {
			return fmt.Errorf("unwanted descendant %q not from dir %q", descendant.FullName, oldDir.Name())
		}

		o := descendant.MinObject
		if _, err := newDir.CloneToChildFile(ctx, nameDiff, o); err != nil {
			return fmt.Errorf("copy file %q: %w", o.Name, err)
		}
		if err := oldDir.DeleteChildFile(ctx, nameDiff, o.Generation, &o.MetaGeneration); err != nil {
			return fmt.Errorf("delete file %q: %w", o.Name, err)
		}

		if err = fs.invalidateChildFileCacheIfExist(oldDir, o.Name); err != nil {
			return fmt.Errorf("unlink: while invalidating cache for delete file: %w", err)
		}
	}

	fs.releaseInodes(&pendingInodes)

	// Delete the backing object of the old directory.
	fs.mu.Lock()
	_, isImplicitDir := fs.implicitDirInodes[oldDir.Name()]
	fs.mu.Unlock()
	oldParent.Lock()
	err = oldParent.DeleteChildDir(ctx, oldName, isImplicitDir, oldDir)
	oldParent.Unlock()
	if err != nil {
		return fmt.Errorf("DeleteChildDir: %w", err)
	}

	return nil
}