in antlir/btrfs_diff/subvolume.py [0:0]
def apply_item(self, item: SendStreamItem) -> None:
for item_type, inode_class in _DUMP_ITEM_TO_INCOMPLETE_INODE.items():
if isinstance(item, item_type):
ino_id = self.id_map.next()
if isinstance(item, SendStreamItems.mkdir):
self.id_map.add_dir(ino_id, item.path)
else:
self.id_map.add_file(ino_id, item.path)
assert ino_id not in self.id_to_inode
# pyre-fixme[16]: This is supposed to be frozen!!!
self.id_to_inode[ino_id] = inode_class(item=item)
return # Done applying item
if isinstance(item, SendStreamItems.rename):
if item.dest.startswith(item.path + b"/"):
raise RuntimeError(f"{item} makes path its own subdirectory")
old_id = self.id_map.get_id(item.path)
if old_id is None:
raise RuntimeError(f"source of {item} does not exist")
new_id = self.id_map.get_id(item.dest)
# Per `rename (2)`, renaming same-inode links has NO effect o_O
if old_id == new_id:
return
# No destination path? Easy.
if new_id is None:
self.id_map.rename_path(item.path, item.dest)
return
# Overwrite an existing path.
if isinstance(self.id_to_inode[old_id], IncompleteDir):
new_ino = self.id_to_inode[new_id]
# _delete() below will ensure that the destination is empty
if not isinstance(new_ino, IncompleteDir):
raise RuntimeError(
f"{item} cannot overwrite {new_ino}, since a "
"directory may only overwrite an empty directory"
)
elif isinstance(self.id_to_inode[new_id], IncompleteDir):
raise RuntimeError(
f"{item} cannot overwrite a directory with a non-directory"
)
self._delete(item.dest)
self.id_map.rename_path(item.path, item.dest)
# NB: Per `rename (2)`, if either the new or the old inode is a
# symbolic link, they get treated just as regular files.
elif isinstance(item, SendStreamItems.unlink):
if isinstance(self.inode_at_path(item.path), IncompleteDir):
raise RuntimeError(f"Cannot {item} a directory")
self._delete(item.path)
elif isinstance(item, SendStreamItems.rmdir):
if not isinstance(self.inode_at_path(item.path), IncompleteDir):
raise RuntimeError(f"Can only {item} a directory")
self._delete(item.path)
elif isinstance(item, SendStreamItems.link):
if self.id_map.get_id(item.path) is not None:
raise RuntimeError(f"Destination of {item} already exists")
old_id = self.id_map.get_id(item.dest)
if old_id is None:
raise RuntimeError(f"{item} source does not exist")
if isinstance(self.id_to_inode[old_id], IncompleteDir):
raise RuntimeError(f"Cannot {item} a directory")
self.id_map.add_file(old_id, item.path)
else: # Any other operation must be handled at inode scope.
ino = self.inode_at_path(item.path)
if ino is None:
raise RuntimeError(f"Cannot apply {item}, path does not exist")
# pyre-fixme[16]: Inode doesn't have apply_item() ...
self._require_inode_at_path(item, item.path).apply_item(item=item)