sync/worktree.py (183 lines of code) (raw):
import os
import shutil
import traceback
from datetime import datetime, timedelta
import git
import pygit2
from . import log
from .base import ProcessName
from .env import Environment
from .lock import MutGuard, SyncLock, mut
from .repos import pygit2_get, wrapper_get
from git.repo.base import Repo
from pygit2 import Worktree as PyGit2Worktree
from pygit2.repository import Repository
from typing import Any, Iterator, Optional, Tuple
env = Environment()
logger = log.get_logger(__name__)
def cleanup(git_gecko: Repo, git_wpt: Repo) -> None:
for repo in [git_gecko, git_wpt]:
pygit2_repo = pygit2_get(repo)
cleanup_repo(repo, pygit2_repo, get_max_worktree_count(repo))
def cleanup_repo(repo: Repo, pygit2_repo: Repository, max_count: Optional[int] = None) -> None:
# TODO: Always cleanup repos where the sync is finished
prune_worktrees(pygit2_repo)
unprunable = []
maybe_prunable = []
prunable = []
now = datetime.now()
for worktree in worktrees(pygit2_repo):
if not os.path.exists(worktree.path):
worktree.prune(True)
continue
process_name = ProcessName.from_tuple(worktree.name.split("-"))
worktree_data = (datetime.fromtimestamp(os.stat(worktree.path).st_mtime),
process_name,
worktree)
if process_name is None:
logger.warning("Worktree doesn't correspond to a sync %s" % worktree.path)
unprunable.append(worktree_data)
continue
pygit2_repo = pygit2.Repository(worktree.path)
head_branch = pygit2_repo.head.name
if not head_branch or head_branch == "HEAD":
logger.warning("No branch associated with worktree %s" % worktree.path)
maybe_prunable.append(worktree_data)
continue
if head_branch.startswith("refs/heads/"):
head_branch = head_branch[len("refs/heads/"):]
branch_process_name = ProcessName.from_path(head_branch)
if branch_process_name is None:
logger.warning("No sync head associated with worktree %s" % worktree.path)
maybe_prunable.append(worktree_data)
continue
if branch_process_name != process_name:
logger.warning("Head branch doesn't match worktree %s" % worktree.path)
maybe_prunable.append(worktree_data)
continue
prunable.append(worktree_data)
if max_count and len(unprunable) > max_count:
logger.error("Unable to cleanup worktrees, because there are too many unprunable worktrees")
if not max_count:
delete_count = 0
else:
delete_count = max(len(unprunable) + len(prunable) + len(maybe_prunable) - max_count, 0)
prunable.sort()
maybe_prunable.sort()
for time, process_name, worktree in (prunable + maybe_prunable):
assert process_name is not None
if time < (now - timedelta(days=2)):
logger.info("Removing worktree without recent activity %s" % worktree.path)
delete_worktree(repo, process_name, worktree)
delete_count -= 1
elif delete_count > 0:
logger.info("Removing LRU worktree %s" % worktree.path)
delete_worktree(repo, process_name, worktree)
delete_count -= 1
else:
break
def delete_worktree(repo: Repo, process_name: ProcessName, worktree: PyGit2Worktree) -> None:
assert worktree.path.startswith(os.path.join(env.config["root"],
env.config["paths"]["worktrees"]))
with SyncLock.for_process(process_name):
try:
logger.info("Deleting path %s" % worktree.path)
shutil.rmtree(worktree.path)
except Exception:
logger.warning("Failed to remove worktree %s:%s" %
(worktree.path, traceback.format_exc()))
else:
logger.info(f"Removed worktree {worktree.path}")
worktree.prune(True)
wrapper = wrapper_get(repo)
assert wrapper is not None
wrapper.after_worktree_delete(worktree.path)
def worktrees(pygit2_repo: Repository) -> Iterator[PyGit2Worktree]:
for name in pygit2_repo.list_worktrees():
yield pygit2_repo.lookup_worktree(name)
def prune_worktrees(pygit2_repo: Repository) -> None:
for worktree in worktrees(pygit2_repo):
# For some reason libgit2 thinks worktrees are not prunable when their
# working dir is gone
if worktree.is_prunable or not os.path.exists(worktree.path):
logger.info("Deleting worktree at path %s" % worktree.path)
worktree.prune(True)
def get_max_worktree_count(repo: Repo) -> Optional[Any]:
repo_wrapper = wrapper_get(repo)
if not repo_wrapper:
return None
repo_name = repo_wrapper.name
max_count = env.config[repo_name]["worktree"]["max-count"]
if not max_count:
return None
max_count = int(max_count)
if max_count <= 0:
return None
return max_count
class Worktree:
"""Wrapper for accessing a git worktree for a specific process.
To access the worktree call .get()
"""
def __init__(self, repo: Repo, process_name: ProcessName) -> None:
self.repo = repo
self.pygit2_repo = pygit2_get(repo)
self._worktree: Optional[Repo] = None
self.process_name = process_name
self.worktree_name = "-".join(str(item) for item in self.process_name.as_tuple())
working_dir = repo.working_dir
assert working_dir is not None
self.path = os.path.join(env.config["root"],
env.config["paths"]["worktrees"],
os.path.basename(working_dir),
process_name.subtype,
process_name.obj_id)
self._lock = None
def as_mut(self, lock: SyncLock) -> MutGuard:
return MutGuard(lock, self)
@property
def lock_key(self) -> Tuple[str, str]:
return (self.process_name.subtype, self.process_name.obj_id)
@mut()
def get(self) -> Repo:
"""Return the worktree.
On first access, the worktree is reset to the current HEAD. Subsequent
access doesn't perform the same check, so it's possible to retain state
within a specific process."""
# TODO: We can get the worktree to only checkout the paths we actually
# need.
# To do this we have to
# * Enable sparse checkouts by setting core.sparseCheckouts
# * Add the worktree with --no-checkout
# * Add the list of paths to check out under $REPO/worktrees/info/sparse-checkout
# * Go to the worktree and check it out
if self._worktree is None:
all_worktrees = {item.name: item for item in worktrees(self.pygit2_repo)}
count = len(all_worktrees)
max_count = get_max_worktree_count(self.repo)
if max_count and count >= max_count:
cleanup_repo(self.repo, self.pygit2_repo, max_count - 1)
path_exists = os.path.exists(self.path)
if self.worktree_name in all_worktrees and not path_exists:
prune_worktrees(self.pygit2_repo)
del all_worktrees[self.worktree_name]
if self.worktree_name not in all_worktrees:
if path_exists:
logger.warning("Found existing content in worktree path %s, removing" %
self.path)
shutil.rmtree(self.path)
logger.info(f"Creating worktree {self.worktree_name} at {self.path}")
if not os.path.exists(os.path.dirname(self.path)):
os.makedirs(os.path.dirname(self.path))
worktree = self.pygit2_repo.add_worktree(self.worktree_name,
os.path.abspath(self.path),
self.pygit2_repo.lookup_reference(
"refs/heads/%s" % self.process_name))
wrapper = wrapper_get(self.repo)
assert wrapper is not None
wrapper.after_worktree_create(self.path)
else:
worktree = self.pygit2_repo.lookup_worktree(self.worktree_name)
assert os.path.exists(self.path)
assert worktree.path == self.path
self._worktree = git.Repo(self.path)
# TODO: In general the worktree should be on the right branch, but it would
# be good to check. In the specific case of landing, we move the wpt worktree
# around various commits, so it isn't necessarily on the correct branch
assert self._worktree is not None
return self._worktree
@mut()
def delete(self) -> None:
if not os.path.exists(self.path):
return
try:
worktree = self.pygit2_repo.lookup_worktree(self.worktree_name)
except Exception:
worktree = None
if worktree is None:
for worktree in worktrees(self.pygit2_repo):
if worktree.path == self.path:
break
else:
# No worktree found
return
assert worktree.path == self.path
delete_worktree(self.repo, self.process_name, worktree)